Skip to content

Latest commit

 

History

History
572 lines (457 loc) · 13.1 KB

demo_ksql-intro.adoc

File metadata and controls

572 lines (457 loc) · 13.1 KB

An introduction to KSQL

Setup

  1. Clone the repository

    git clone https://github.com/confluentinc/demo-scene.git
    cd demo-scene
  2. Start up the environment

    The first time that you do this, the Docker images will be pulled down from the remote server. This may take a while!

    cd ksql-intro
    docker-compose up -d
  3. Launch the KSQL CLI in TWO terminal windows (you’ll need both)

    docker exec -it ksql-cli bash -c 'echo -e "\n\n⏳ Waiting for KSQL to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksql-server:8088/info) ; echo -e $(date) " KSQL server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done ; ksql http://ksql-server:8088'

Getting started

Declare stream

SHOW TOPICS;

PRINT 'orders';

CREATE STREAM ORDERS WITH
  (VALUE_FORMAT='AVRO',
   KAFKA_TOPIC ='orders');

SHOW STREAMS;

DESCRIBE ORDERS;

SELECT * FROM ORDERS LIMIT 10;

SELECT ORDERID, ADDRESS FROM ORDERS LIMIT 10;

Filter for given state

filter01
SELECT *
  FROM ORDERS
 WHERE ADDRESS->STATE='New York';

Persist to a new Kafka topic

CREATE STREAM ORDERS_NY AS
  SELECT *
    FROM ORDERS
   WHERE ADDRESS->STATE='New York';

Open a second terminal and run KSQL CLI

docker exec -it ksql-cli ksql http://ksql-server:8088

In the original window show the source messages:

PRINT 'orders';

Print the new topic in the second window:

PRINT 'ORDERS_NY';

Optionally, use the terminal software’s Find function to highlight all New York text in the source stream, and show that only these messages are written to the new topic.

Schema manipulation - Drop field

schema01
SELECT ORDERTIME, ORDERID, ITEMID, ORDERUNITS
  FROM ORDERS;

CREATE STREAM ORDERS_NO_ADDRESS_DATA AS
  SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
         ORDERID,
         ITEMID,
         ORDERUNITS
    FROM ORDERS;

Observe that new schema has no address fields:

DESCRIBE EXTENDED ORDERS_NO_ADDRESS_DATA;

In second window, print the new topic:

PRINT 'ORDERS_NO_ADDRESS_DATA';

In the original window show the source messages:

PRINT 'orders';

Joins

Join to items reference

join01
PRINT 'item_details_01' LIMIT 10;

CREATE TABLE ITEM_REFERENCE_01 WITH (VALUE_FORMAT='AVRO',
                                     KAFKA_TOPIC='item_details_01',
                                     KEY='ID');

DESCRIBE ITEM_REFERENCE_01;

SELECT TIMESTAMPTOSTRING(O.ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
       O.ORDERID,
       O.ITEMID,
       I.MAKE,
       I.COLOUR,
       I.UNIT_COST,
       O.ORDERUNITS,
       O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE,
       O.ADDRESS
  FROM ORDERS O
       INNER JOIN ITEM_REFERENCE_01 I
       ON O.ITEMID = I.ID
 LIMIT 5;

CREATE STREAM ORDERS_ENRICHED AS
SELECT O.ROWTIME AS ORDER_TIMESTAMP,
       O.ORDERID,
       O.ITEMID,
       I.MAKE,
       I.COLOUR,
       I.UNIT_COST,
       O.ORDERUNITS,
       O.ORDERUNITS * I.UNIT_COST AS TOTAL_ORDER_VALUE,
       O.ADDRESS
  FROM ORDERS O
       INNER JOIN ITEM_REFERENCE_01 I
       ON O.ITEMID = I.ID ;

Land to Elasticsearch

connect01
curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-elastic-orders-01/config \
    -d '{
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "type.name": "type.name=kafkaconnect",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "topics": "ORDERS_ENRICHED",
        "schema.ignore": "true",
        "connection.url": "http://elasticsearch:9200"
    }'

Check that the connector is RUNNING

curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
         jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
         column -s : -t| sed 's/\"//g'| sort
sink    |  sink-elastic-orders-01          |  RUNNING  |  RUNNING  |  io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
source  |  source-datagen-item_details_01  |  RUNNING  |  FAILED   |  io.confluent.kafka.connect.datagen.DatagenConnector
source  |  source-datagen-orders-uk        |  RUNNING  |  RUNNING  |  io.confluent.kafka.connect.datagen.DatagenConnector
source  |  source-datagen-orders-us        |  RUNNING  |  RUNNING  |  io.confluent.kafka.connect.datagen.DatagenConnector

View in Kibana


Note
40 minute talk ends here

Schema manipulation - Flatten schema & derive new columns

schema02
CREATE STREAM ORDERS_FLAT AS
  SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') AS ORDER_TIMESTAMP,
         ORDERTIME AS ORDERTIME_EPOCH,
         ORDERID,
         ITEMID,
         ORDERUNITS,
         ADDRESS->STREET AS ADDRESS_STREET,
         ADDRESS->CITY AS ADDRESS_CITY,
         ADDRESS->STATE AS ADDRESS_STATE
    FROM ORDERS;
PRINT 'ORDERS_FLAT';

Reserialise to CSV

reserialise01
CREATE STREAM ORDERS_FLAT_CSV WITH (VALUE_FORMAT='DELIMITED',
                                    KAFKA_TOPIC='orders_csv') AS
  SELECT * FROM ORDERS_FLAT;

PRINT 'orders_csv';

Aggregates

Orders count by manufacturer

agg01
SELECT MAKE, COUNT(*) AS ORDER_COUNT
  FROM ORDERS_ENRICHED
  GROUP BY MAKE
  LIMIT 5;

Total order value per hour, by manufacturer

SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
       MAKE,
       COUNT(*) AS ORDER_COUNT,
       SUM(TOTAL_ORDER_VALUE) AS TOTAL_ORDER_VALUE
  FROM ORDERS_ENRICHED
         WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY MAKE;

Manufacturers for which there have been more than $10,000 of orders in an hour

SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
       MAKE,
       COUNT(*) AS ORDER_COUNT,
       SUM(TOTAL_ORDER_VALUE) AS TOTAL_ORDER_VALUE
  FROM ORDERS_ENRICHED
         WINDOW TUMBLING (SIZE 1 HOUR)
GROUP BY MAKE
HAVING SUM(TOTAL_ORDER_VALUE) > 10000;

Merging and Splitting Streams

Merging streams with INSERT INTO

merge01

Imagine you have two inbound streams of orders, from separate geographies (e.g. UK and US). You want to combine these into a single stream for use by consumers.

Add the second stream, containing UK orders:

SHOW TOPICS;

CREATE STREAM ORDERS_UK WITH (VALUE_FORMAT='AVRO', KAFKA_TOPIC='orders_uk');

SELECT * FROM ORDERS_UK LIMIT 5;

Create the new combined stream, populated first by all US orders (the original ORDERS stream):

CREATE STREAM ORDERS_COMBINED AS
  SELECT 'US' AS SOURCE,
         'US-'+CAST(ORDERID AS VARCHAR) AS ORDERID,
         ORDERTIME,
         ITEMID,
         ORDERUNITS,
         ADDRESS
    FROM ORDERS
    PARTITION BY ORDERID;

Add the source of UK order data:

INSERT INTO ORDERS_COMBINED
  SELECT 'UK' AS SOURCE,
         'UK-'+CAST(ORDERID AS VARCHAR) AS ORDERID,
         ORDERTIME,
         ITEMID,
         ORDERUNITS,
         ADDRESS
    FROM ORDERS_UK
    PARTITION BY ORDERID;
SET 'auto.offset.reset' = 'latest';

SELECT * FROM ORDERS_COMBINED LIMIT 20;

N.B. in KSQL 5.3 the above should be changed to a CREATE STREAM and two INSERT INTO, instead of a CSAS.

Splitting streams

split01

Imagine you have only the single source of ORDERS_COMBINED and you want two separate streams of US and UK order data :

CREATE STREAM ORDER_SPLIT_US AS
  SELECT *
    FROM ORDERS_COMBINED
   WHERE SOURCE ='US';

CREATE STREAM ORDER_SPLIT_UK AS
  SELECT *
    FROM ORDERS_COMBINED
   WHERE SOURCE ='UK';

CREATE STREAM ORDER_SPLIT_OTHER AS
  SELECT *
    FROM ORDERS_COMBINED
   WHERE SOURCE !='US'
     AND SOURCE !='UK';

SELECT SOURCE, COUNT(*) AS ORDER_COUNT
  FROM ORDER_SPLIT_US
GROUP BY SOURCE;

SELECT SOURCE, COUNT(*) AS ORDER_COUNT
  FROM ORDER_SPLIT_UK
GROUP BY SOURCE;

Time handling

Event time vs ingest time (ORDERTIME vs ROWTIME)

SELECT TIMESTAMPTOSTRING(ORDERTIME,'yyyy-MM-dd HH:mm:ss'),
       'Order ID : ' + CAST(ORDERID AS VARCHAR) AS ORDERID
  FROM ORDERS
 WHERE ITEMID='Item_42'
 LIMIT 5;
2019-06-09 15:18:07 | Order ID : 25
2019-06-09 11:15:30 | Order ID : 224
2019-06-09 22:03:59 | Order ID : 246
2019-06-09 02:42:02 | Order ID : 257
2019-06-09 23:01:00 | Order ID : 362
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
       ITEMID,
       COUNT(*) AS ORDER_COUNT
  FROM ORDERS
         WINDOW TUMBLING (SIZE 1 HOUR)
 WHERE ITEMID='Item_42'
GROUP BY ITEMID;
2019-06-11 10:00:00 | Item_42 | 1
2019-06-11 10:00:00 | Item_42 | 18
2019-06-11 10:00:00 | Item_42 | 19
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss'),
       TIMESTAMPTOSTRING(ORDERTIME,'yyyy-MM-dd HH:mm:ss'),
       'Order ID : ' + CAST(ORDERID AS VARCHAR) AS ORDERID
  FROM ORDERS
 WHERE ITEMID='Item_42'
 LIMIT 5;
2019-07-01 11:16:29 | 2019-06-09 15:18:07 | Order ID : 25
2019-07-01 11:17:20 | 2019-06-09 11:15:30 | Order ID : 224
2019-07-01 11:17:25 | 2019-06-09 22:03:59 | Order ID : 246
2019-07-01 11:17:28 | 2019-06-09 02:42:02 | Order ID : 257
2019-07-01 11:17:56 | 2019-06-09 23:01:00 | Order ID : 362
CREATE STREAM ORDERS_BY_EVENTTIME WITH (VALUE_FORMAT='AVRO',
                                        KAFKA_TOPIC='orders',
                                        TIMESTAMP='ORDERTIME');
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss'),
       TIMESTAMPTOSTRING(ORDERTIME,'yyyy-MM-dd HH:mm:ss'),
       'Order ID : ' + CAST(ORDERID AS VARCHAR) AS ORDERID
  FROM ORDERS_BY_EVENTTIME
 WHERE ITEMID='Item_42'
 LIMIT 5;
2019-06-09 11:40:16 | 2019-06-09 11:40:16 | 15
2019-06-09 12:49:45 | 2019-06-09 12:49:45 | 129
2019-06-09 19:50:33 | 2019-06-09 19:50:33 | 246
2019-06-09 23:02:23 | 2019-06-09 23:02:23 | 657
2019-06-09 05:22:04 | 2019-06-09 05:22:04 | 763
SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
       ITEMID,
       COUNT(*) AS ORDER_COUNT
  FROM ORDERS_BY_EVENTTIME
         WINDOW TUMBLING (SIZE 1 HOUR)
 WHERE ITEMID='Item_42'
GROUP BY ITEMID;
2019-06-09 08:00:00 | Item_42 | 1
2019-06-09 19:00:00 | Item_42 | 1
2019-06-09 23:00:00 | Item_42 | 2

Analytics - pivoting & bucketing metrics

  • Using CASE to bucket metrics:

    SELECT ORDERID,
           ORDERUNITS,
           CASE WHEN ORDERUNITS > 15 THEN 'Really big order'
                WHEN ORDERUNITS > 10 THEN 'Big order'
                                     ELSE 'Normal order'
             END AS ORDER_TYPE
      FROM ORDERS_ENRICHED
      LIMIT 10;
    0 | 18 | Really big order
    1 | 13 | Big order
    3 | 5 | Normal order
    0 | 8 | Normal order
    1 | 5 | Normal order
    2 | 5 | Normal order
    3 | 15 | Big order
    4 | 19 | Really big order
    5 | 2 | Normal order
    6 | 4 | Normal order
  • Using CASE to create bucket metric aggregates

    SELECT CASE WHEN ORDERUNITS > 15 THEN 'Really big order'
                WHEN ORDERUNITS > 10 THEN 'Big order'
                                     ELSE 'Normal order'
             END AS ORDER_TYPE,
           COUNT(*)
      FROM ORDERS_ENRICHED
    GROUP BY CASE WHEN ORDERUNITS > 15 THEN 'Really big order'
                  WHEN ORDERUNITS > 10 THEN 'Big order'
                                       ELSE 'Normal order'
               END;
  • Using CASE to pivot bucket aggregates

    SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
           MAKE,
           COUNT(*),
           SUM(CASE WHEN ORDERUNITS > 15 THEN 1 ELSE 0 END) AS CT_REALLY_BIG_ORDER,
           SUM(CASE WHEN ORDERUNITS > 10 AND ORDERUNITS<15 THEN 1 ELSE 0 END) AS CT_BIG_ORDER,
           SUM(CASE WHEN ORDERUNITS <= 10 THEN 1 ELSE 0 END) AS CT_NORMAL_ORDER
      FROM ORDERS_ENRICHED
            WINDOW TUMBLING (SIZE 1 HOUR)
    GROUP BY MAKE
    LIMIT 5;
    2019-07-01 11:00:00 | Hilpert and Sons | 1 | 1 | 0 | 0
    2019-07-01 11:00:00 | Considine and Sons | 71 | 12 | 11 | 43
    2019-07-01 11:00:00 | MacGyver Group | 63 | 14 | 11 | 33
    2019-07-01 11:00:00 | Bauch-Hudson | 64 | 14 | 16 | 30
    2019-07-01 11:00:00 | Corkery-Rath | 59 | 14 | 12 | 31