-
Clone the repository
git clone https://github.com/confluentinc/demo-scene.git cd demo-scene
-
Start up the environment
The first time that you do this, the Docker images will be pulled down from the remote server. This may take a while!
cd ksql-intro docker-compose up -d
-
Launch the KSQL CLI in TWO terminal windows (you’ll need both)
docker exec -it ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/info) ; echo -e $(date) " KSQL server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then break ; fi ; sleep 5 ; done ; ksql http://ksql-server:8088'
SELECT *
FROM ORDERS
WHERE ADDRESS->STATE='New York';
CREATE STREAM ORDERS_NY AS
SELECT *
FROM ORDERS
WHERE ADDRESS->STATE='New York';
Open a second terminal and run KSQL CLI
docker exec -it ksql-cli ksql http://ksql-server:8088
In the original window show the source messages:
PRINT 'orders';
Print the new topic in the second window:
PRINT 'ORDERS_NY';
Optionally, use the terminal software’s Find function to highlight all New York
text in the source stream, and show that only these messages are written to the new topic.
SELECT ORDERTIME, ORDERID, ITEMID, ORDERUNITS
FROM ORDERS;
CREATE STREAM ORDERS_NO_ADDRESS_DATA AS
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
ORDERID,
ITEMID,
ORDERUNITS
FROM ORDERS;
Observe that new schema has no address fields:
DESCRIBE EXTENDED ORDERS_NO_ADDRESS_DATA;
In second window, print the new topic:
PRINT 'ORDERS_NO_ADDRESS_DATA';
In the original window show the source messages:
PRINT 'orders';
PRINT 'item_details_01' LIMIT 10;
CREATE TABLE ITEM_REFERENCE_01 WITH (VALUE_FORMAT='AVRO',
KAFKA_TOPIC='item_details_01',
KEY='ID');
DESCRIBE ITEM_REFERENCE_01;
SELECT TIMESTAMPTOSTRING(O.ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
O.ORDERID,
O.ITEMID,
I.MAKE,
I.COLOUR,
I.UNIT_COST,
O.ORDERUNITS,
O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE,
O.ADDRESS
FROM ORDERS O
INNER JOIN ITEM_REFERENCE_01 I
ON O.ITEMID = I.ID
LIMIT 5;
CREATE STREAM ORDERS_ENRICHED AS
SELECT O.ROWTIME AS ORDER_TIMESTAMP,
O.ORDERID,
O.ITEMID,
I.MAKE,
I.COLOUR,
I.UNIT_COST,
O.ORDERUNITS,
O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE,
O.ADDRESS
FROM ORDERS O
INNER JOIN ITEM_REFERENCE_01 I
ON O.ITEMID = I.ID ;
Land to Elasticsearch
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-elastic-orders-01/config \
-d '{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "type.name=kafkaconnect",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"topics": "ORDERS_ENRICHED",
"schema.ignore": "true",
"connection.url": "http://elasticsearch:9200"
}'
Check that the connector is RUNNING
curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink-elastic-orders-01 | RUNNING | RUNNING | io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
source | source-datagen-item_details_01 | RUNNING | FAILED | io.confluent.kafka.connect.datagen.DatagenConnector
source | source-datagen-orders-uk | RUNNING | RUNNING | io.confluent.kafka.connect.datagen.DatagenConnector
source | source-datagen-orders-us | RUNNING | RUNNING | io.confluent.kafka.connect.datagen.DatagenConnector
View in Kibana
Note
|
40 minute talk ends here |
CREATE STREAM ORDERS_FLAT AS
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
ORDERTIME AS ORDERTIME_EPOCH,
ORDERID,
ITEMID,
ORDERUNITS,
ADDRESS->STREET AS ADDRESS_STREET,
ADDRESS->CITY AS ADDRESS_CITY,
ADDRESS->STATE AS ADDRESS_STATE
FROM ORDERS;
PRINT 'ORDERS_FLAT';
SELECT MAKE, COUNT(*) AS ORDER_COUNT
FROM ORDERS_ENRICHED
GROUP BY MAKE
LIMIT 5;
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
MAKE,
COUNT(*) AS ORDER_COUNT,
SUM(TOTAL_ORDER_VALUE) AS TOTAL_ORDER_VALUE
FROM ORDERS_ENRICHED
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY MAKE;
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
MAKE,
COUNT(*) AS ORDER_COUNT,
SUM(TOTAL_ORDER_VALUE) AS TOTAL_ORDER_VALUE
FROM ORDERS_ENRICHED
WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY MAKE
HAVING SUM(TOTAL_ORDER_VALUE) > 10000;
Imagine you have two inbound streams of orders, from separate geographies (e.g. UK and US). You want to combine these into a single stream for use by consumers.
Add the second stream, containing UK orders:
SHOW TOPICS;
CREATE STREAM ORDERS_UK WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='orders_uk');
SELECT * FROM ORDERS_UK LIMIT 5;
Create the new combined stream, populated first by all US orders (the original ORDERS
stream):
CREATE STREAM ORDERS_COMBINED AS
SELECT 'US' AS SOURCE,
'US-'+CAST(ORDERID AS VARCHAR) AS ORDERID,
ORDERTIME,
ITEMID,
ORDERUNITS,
ADDRESS
FROM ORDERS
PARTITION BY ORDERID;
Add the source of UK order data:
INSERT INTO ORDERS_COMBINED
SELECT 'UK' AS SOURCE,
'UK-'+CAST(ORDERID AS VARCHAR) AS ORDERID,
ORDERTIME,
ITEMID,
ORDERUNITS,
ADDRESS
FROM ORDERS_UK
PARTITION BY ORDERID;
SET 'auto.offset.reset' = 'latest';
SELECT * FROM ORDERS_COMBINED LIMIT 20;
N.B. in KSQL 5.3 the above should be changed to a CREATE STREAM and two INSERT INTO, instead of a CSAS.
Imagine you have only the single source of ORDERS_COMBINED
and you want two separate streams of US and UK order data :
CREATE STREAM ORDER_SPLIT_US AS
SELECT *
FROM ORDERS_COMBINED
WHERE SOURCE ='US';
CREATE STREAM ORDER_SPLIT_UK AS
SELECT *
FROM ORDERS_COMBINED
WHERE SOURCE ='UK';
CREATE STREAM ORDER_SPLIT_OTHER AS
SELECT *
FROM ORDERS_COMBINED
WHERE SOURCE !='US'
AND SOURCE !='UK';
SELECT SOURCE, COUNT(*) AS ORDER_COUNT
FROM ORDER_SPLIT_US
GROUP BY SOURCE;
SELECT SOURCE, COUNT(*) AS ORDER_COUNT
FROM ORDER_SPLIT_UK
GROUP BY SOURCE;
SELECT TIMESTAMPTOSTRING(ORDERTIME,'yyyy-MM-dd HH:mm:ss'),
'Order ID : ' + CAST(ORDERID AS VARCHAR) AS ORDERID
FROM ORDERS
WHERE ITEMID='Item_42'
LIMIT 5;
2019-06-09 15:18:07 | Order ID : 25
2019-06-09 11:15:30 | Order ID : 224
2019-06-09 22:03:59 | Order ID : 246
2019-06-09 02:42:02 | Order ID : 257
2019-06-09 23:01:00 | Order ID : 362
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
ITEMID,
COUNT(*) AS ORDER_COUNT
FROM ORDERS
WINDOW TUMBLING (SIZE 1 HOUR)
WHERE ITEMID='Item_42'
GROUP BY ITEMID;
2019-06-11 10:00:00 | Item_42 | 1
2019-06-11 10:00:00 | Item_42 | 18
2019-06-11 10:00:00 | Item_42 | 19
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss'),
TIMESTAMPTOSTRING(ORDERTIME,'yyyy-MM-dd HH:mm:ss'),
'Order ID : ' + CAST(ORDERID AS VARCHAR) AS ORDERID
FROM ORDERS
WHERE ITEMID='Item_42'
LIMIT 5;
2019-07-01 11:16:29 | 2019-06-09 15:18:07 | Order ID : 25
2019-07-01 11:17:20 | 2019-06-09 11:15:30 | Order ID : 224
2019-07-01 11:17:25 | 2019-06-09 22:03:59 | Order ID : 246
2019-07-01 11:17:28 | 2019-06-09 02:42:02 | Order ID : 257
2019-07-01 11:17:56 | 2019-06-09 23:01:00 | Order ID : 362
CREATE STREAM ORDERS_BY_EVENTTIME WITH (VALUE_FORMAT='AVRO',
KAFKA_TOPIC='orders',
TIMESTAMP='ORDERTIME');
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss'),
TIMESTAMPTOSTRING(ORDERTIME,'yyyy-MM-dd HH:mm:ss'),
'Order ID : ' + CAST(ORDERID AS VARCHAR) AS ORDERID
FROM ORDERS_BY_EVENTTIME
WHERE ITEMID='Item_42'
LIMIT 5;
2019-06-09 11:40:16 | 2019-06-09 11:40:16 | 15
2019-06-09 12:49:45 | 2019-06-09 12:49:45 | 129
2019-06-09 19:50:33 | 2019-06-09 19:50:33 | 246
2019-06-09 23:02:23 | 2019-06-09 23:02:23 | 657
2019-06-09 05:22:04 | 2019-06-09 05:22:04 | 763
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
ITEMID,
COUNT(*) AS ORDER_COUNT
FROM ORDERS_BY_EVENTTIME
WINDOW TUMBLING (SIZE 1 HOUR)
WHERE ITEMID='Item_42'
GROUP BY ITEMID;
2019-06-09 08:00:00 | Item_42 | 1
2019-06-09 19:00:00 | Item_42 | 1
2019-06-09 23:00:00 | Item_42 | 2
-
Using
CASE
to bucket metrics:SELECT ORDERID, ORDERUNITS, CASE WHEN ORDERUNITS > 15 THEN 'Really big order' WHEN ORDERUNITS > 10 THEN 'Big order' ELSE 'Normal order' END AS ORDER_TYPE FROM ORDERS_ENRICHED LIMIT 10;
0 | 18 | Really big order 1 | 13 | Big order 3 | 5 | Normal order 0 | 8 | Normal order 1 | 5 | Normal order 2 | 5 | Normal order 3 | 15 | Big order 4 | 19 | Really big order 5 | 2 | Normal order 6 | 4 | Normal order
-
Using
CASE
to create bucket metric aggregatesSELECT CASE WHEN ORDERUNITS > 15 THEN 'Really big order' WHEN ORDERUNITS > 10 THEN 'Big order' ELSE 'Normal order' END AS ORDER_TYPE, COUNT(*) FROM ORDERS_ENRICHED GROUP BY CASE WHEN ORDERUNITS > 15 THEN 'Really big order' WHEN ORDERUNITS > 10 THEN 'Big order' ELSE 'Normal order' END;
-
Using
CASE
to pivot bucket aggregatesSELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, MAKE, COUNT(*), SUM(CASE WHEN ORDERUNITS > 15 THEN 1 ELSE 0 END) AS CT_REALLY_BIG_ORDER, SUM(CASE WHEN ORDERUNITS > 10 AND ORDERUNITS<15 THEN 1 ELSE 0 END) AS CT_BIG_ORDER, SUM(CASE WHEN ORDERUNITS <= 10 THEN 1 ELSE 0 END) AS CT_NORMAL_ORDER FROM ORDERS_ENRICHED WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY MAKE LIMIT 5;
2019-07-01 11:00:00 | Hilpert and Sons | 1 | 1 | 0 | 0 2019-07-01 11:00:00 | Considine and Sons | 71 | 12 | 11 | 43 2019-07-01 11:00:00 | MacGyver Group | 63 | 14 | 11 | 33 2019-07-01 11:00:00 | Bauch-Hudson | 64 | 14 | 16 | 30 2019-07-01 11:00:00 | Corkery-Rath | 59 | 14 | 12 | 31