Stream Table
Stream tables contain immutable rows of data that have a timestamp. All tables imported from data sources are stream tables and stream tables can be exported to data sinks.
This documentation covers how to convert between stream and state tables and documents the differences in query semantics for certain queries over stream tables.
Convert Stream to State
Stream tables can be converted to state tables by deduplication or aggregation.
Deduplication
If a stream table represents a change stream of a state table, the state table can be defined with the DISTINCT
query.
Products := DISTINCT Products ON id ORDER BY updated DESC;
The DISTINCT
query picks the most recent version - as defined by the ORDER BY
clause - from the stream for each unique key - as defined by the ON
clause.
In this example, we are defining the Products
state table based on the imported Products
change stream by selecting the most recent stream record based on the updated
timestamp for each product id
.
Another way to deduplicate a stream table is to select a list of distinct columns.
Users := SELECT DISTINCT customerid AS id FROM Orders;
This statement defines the Users
state table as the set of all unique customer ids from the Orders
stream table.
Aggregation
Aggregating over a stream table produces a state table.
Users.total_spend := SELECT sum(i.total) AS spend,
sum(i.discount) AS saved
FROM @ JOIN Orders o ON o.customerid = @.id JOIN o.items i;
This statement defines the nested total_spend
state table that aggregates the total spending and saving for each user across all their orders from the Orders
state table.
An exception to this are time-window aggregations which preserve time and produce a stream table.
Convert State to Stream
STREAM
statements convert state tables to stream tables.
UserPromotion := STREAM ON ADD AS
SELECT u.id, u.first_name, u.last_name, u.email, s.spend
FROM Users u JOIN u.order_stats s WHERE s.spend >= 100;
This statement defines a new UserPromotion
stream table that contain a stream record for every record that gets added to state table defined by the SELECT
query. The query asks for all users who have spent more than $100 in aggregate. Whenever a user meets this threshold and gets added to the table, a stream record is produced in the UserPromotion
stream table.
STREAM ON ADD
: produces a stream record for every record that gets added to the state table defined by theSELECT
query.STREAM ON UPDATE
: produces a stream record every time a record in the state table defined by theSELECT
query is updated. This produces a change stream of the state table.STREAM ON DELETE
: produces a stream record for every record that gets deleted from the state table defined by theSELECT
query.
Queries
Queries over stream tables differ in semantics from standard SQL queries over state tables in the following cases:
Time Window Aggregation
SQRL provides a number of time-preserving functions which aggregate timestamps into time windows. Time windows are a means to divide the timeline into discrete buckets and aggregate all stream records within each bucket to produce a new stream table that contains one row for each aggregate.
Users.spending := SELECT endOfWeek(p.time) AS week,
sum(t.price) AS spend, sum(t.saving) AS saved
FROM @.purchases p JOIN p.totals t
GROUP BY week ORDER BY week DESC;
This statement defines the nested spending
table that aggregates money spent and saved for all the orders that a user placed per week. endOfWeek
is a time window function that groups timestamps by week and returns the end of the week. The time window functions are contained in the time package.
Users.spending
is a stream table that contains one record per user per week for their spending totals.
Time window aggregations can be used to compute arbitrary roll-ups of stream data.
Learn more about how SQRL executes time windows on the timeline.
Recency Filters
Stream tables can be filtered on recency which is useful when aggregating over a recent period of time.
RecentTotal := SELECT sum(i.total) AS total, sum(i.quantity) AS quantity
FROM Orders o JOIN o.items i
WHERE o.time > now() - INTERVAL 7 DAY;
This statement defines the RecentTotal
state table an aggregation over all orders that were placed in the last 7 days. Recency filters use the special function now()
to restrict the timestamp of stream records to a period specified by an interval.
Learn more about now()
and how SQRL processes time.
Temporal Join
SQRL supports temporal joins between stream and state tables when joining on the state table's key. A temporal join joins rows from the stream table with the version of the row in the state table at the time of the stream row. Temporal joins use the row from the state table at the timestamp of the stream row. This allows to join stream data with state data at the time when the stream record "happened". A temporal join produces a stream table.
OrderCountry := SELECT o.time, u.country
FROM Orders o TEMPORAL JOIN Users u ON o.customerid = u.id;
The OrderCountry
stream is defined as a temporal join between the Orders
stream and the Users
state table. It joins each order with the user who placed the record at the time
of the order to retrieve the country
that the user was in at the time that the order was placed.
This is different from an inner join which joins each order record with the most current user record.
OrderCountryInner := SELECT o.time, u.country
FROM Orders o INNER JOIN Users u ON o.customerid = u.id;
This statement produces a state table that changes whenever users change country because each order is joined with the current version of its user row.
In most cases you want to use a temporal join when joining stream and state tables. SQRL uses temporal joins for all default joins between a stream and a state table on the state table's key.
Interval Join
SQRL supports interval joins when joining two stream tables and restricting the stream tables' timestamps with upper and lower bounds. Interval joins are joins that happen during an interval of time.