Skip to content

Commit

Permalink
Fix ordertime/rowtime, and add partitioning
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Feb 12, 2021
1 parent 81117dd commit 9304b89
Showing 1 changed file with 59 additions and 27 deletions.
86 changes: 59 additions & 27 deletions introduction-to-ksqldb/demo_introduction_to_ksqldb_02.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ SELECT ORDERTIME, ORDERID, ITEMID, ORDERUNITS
EMIT CHANGES;
CREATE STREAM ORDERS_NO_ADDRESS_DATA AS
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
SELECT TIMESTAMPTOSTRING(ORDERTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
ORDERID,
ITEMID,
ORDERUNITS
Expand Down Expand Up @@ -157,7 +157,7 @@ CREATE TABLE ITEM_REFERENCE_01 (ITEM_ID VARCHAR PRIMARY KEY)
DESCRIBE ITEM_REFERENCE_01;
SELECT TIMESTAMPTOSTRING(O.ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
SELECT TIMESTAMPTOSTRING(O.ORDERTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
O.ORDERID,
O.ITEMID,
I.MAKE,
Expand All @@ -174,7 +174,7 @@ LIMIT 5;
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM ORDERS_ENRICHED AS
SELECT O.ROWTIME AS ORDER_TIMESTAMP,
SELECT O.ORDERTIME AS ORDER_TIMESTAMP,
O.ORDERID,
O.ITEMID,
I.MAKE,
Expand All @@ -185,7 +185,8 @@ SELECT O.ROWTIME AS ORDER_TIMESTAMP,
O.ADDRESS
FROM ORDERS O
INNER JOIN ITEM_REFERENCE_01 I
ON O.ITEMID = I.ITEM_ID ;
ON O.ITEMID = I.ITEM_ID
PARTITION BY ORDERID;
----

Land to Elasticsearch
Expand All @@ -202,6 +203,7 @@ CREATE SINK CONNECTOR SINK_ELASTIC_ORDERS_01 WITH (
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc',
'key.ignore' = 'true',
'schema.ignore' = 'true'
);
----
Expand Down Expand Up @@ -243,7 +245,7 @@ image::images/schema02.png[]
[source,sql]
----
CREATE STREAM ORDERS_FLAT AS
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
SELECT TIMESTAMPTOSTRING(ORDERTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
ORDERTIME AS ORDERTIME_EPOCH,
ORDERID,
ITEMID,
Expand Down Expand Up @@ -282,22 +284,44 @@ image::images/agg01.png[]

[source,sql]
----
SELECT MAKE, COUNT(*) AS ORDER_COUNT
SELECT MAKE,
COUNT(*) AS ORDER_COUNT,
SUM(TOTAL_ORDER_VALUE) AS TOTAL_ORDER_VALUE
FROM ORDERS_ENRICHED
GROUP BY MAKE
EMIT CHANGES
LIMIT 5;
----

[source,sql]
----
CREATE TABLE MAKE_AGG AS
SELECT MAKE,
COUNT(*) AS ORDER_COUNT,
SUM(TOTAL_ORDER_VALUE) AS TOTAL_ORDER_VALUE
FROM ORDERS_ENRICHED
GROUP BY MAKE;
----

Pull query

[source,sql]
----
SELECT ORDER_COUNT,
TOTAL_ORDER_VALUE
FROM MAKE_AGG
WHERE MAKE IN ('Satterfield and Sons',
'Beer, Feil and Ratke');
----

### Total order value per hour, by make

[source,sql]
----
CREATE TABLE ORDERS_PER_HOUR_BY_MAKE2 AS
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
MAKE,
COUNT(*) AS ORDER_COUNT,
CAST(SUM(TOTAL_ORDER_VALUE) AS DECIMAL(9,2)) AS TOTAL_ORDER_VALUE
CREATE TABLE ORDERS_PER_HOUR_BY_MAKE AS
SELECT MAKE,
COUNT(*) AS ORDER_COUNT,
CAST(SUM(TOTAL_ORDER_VALUE) AS DECIMAL(9,2)) AS TOTAL_ORDER_VALUE
FROM ORDERS_ENRICHED
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY MAKE
Expand All @@ -308,13 +332,17 @@ Pull query

[source,sql]
----
ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END,
MAKE,
ORDER_COUNT,
TOTAL_ORDER_VALUE
FROM ORDERS_PER_HOUR_BY_MAKE
WHERE MAKE = 'Funk Inc';
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END,
MAKE,
ORDER_COUNT,
TOTAL_ORDER_VALUE
FROM ORDERS_PER_HOUR_BY_MAKE
WHERE MAKE = 'Funk Inc';
----

[source,sql]
----
+---------------------+------------+----------+-------------+------------------+
|WINDOW_START |WINDOW_END |MAKE |ORDER_COUNT |TOTAL_ORDER_VALUE |
+---------------------+------------+----------+-------------+------------------+
Expand All @@ -331,13 +359,17 @@ Push query

[source,sql]
----
ksql> SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END,
MAKE,
ORDER_COUNT,
TOTAL_ORDER_VALUE
FROM ORDERS_PER_HOUR_BY_MAKE
WHERE MAKE = 'Funk Inc' emit changes;
SELECT TIMESTAMPTOSTRING(WINDOWSTART,'yyyy-MM-dd HH:mm:ss','Europe/London') AS WINDOW_START,
TIMESTAMPTOSTRING(WINDOWEND,'HH:mm:ss','Europe/London') AS WINDOW_END,
MAKE,
ORDER_COUNT,
TOTAL_ORDER_VALUE
FROM ORDERS_PER_HOUR_BY_MAKE
EMIT CHANGES;
----

[source,sql]
----
+----------------------+-----------+----------+-------------+------------------+
|WINDOW_START |WINDOW_END |MAKE |ORDER_COUNT |TOTAL_ORDER_VALUE |
+----------------------+-----------+----------+-------------+------------------+
Expand Down Expand Up @@ -513,7 +545,7 @@ EMIT CHANGES;

[source,sql]
----
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS ROWTIME_,
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS ROWTIME_STR,
TIMESTAMPTOSTRING(ORDERTIME,'yyyy-MM-dd HH:mm:ss') AS ORDERTIME,
'Order ID : ' + CAST(ORDERID AS VARCHAR) AS ORDERID
FROM ORDERS
Expand Down Expand Up @@ -544,7 +576,7 @@ CREATE STREAM ORDERS_BY_EVENTTIME WITH (VALUE_FORMAT='AVRO',

[source,sql]
----
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS ROWTIME_,
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS ROWTIME_STR,
TIMESTAMPTOSTRING(ORDERTIME,'yyyy-MM-dd HH:mm:ss') AS ORDERTIME,
'Order ID : ' + CAST(ORDERID AS VARCHAR) AS ORDERID
FROM ORDERS_BY_EVENTTIME
Expand Down

0 comments on commit 9304b89

Please sign in to comment.