Skip to content

Latest commit

 

History

History
1110 lines (840 loc) · 42.9 KB

ksql-workshop.adoc

File metadata and controls

1110 lines (840 loc) · 42.9 KB

Workshop: Real-time SQL Stream Processing at Scale with Apache Kafka and KSQL (Mac/Linux)

1. Introduction

KSQL is the streaming SQL engine for Apache Kafka. Using just SQL it is possible for developers to build powerful stream processing applications. This workshop will show practical examples of KSQL:

  • Filtering streams of data

  • Joining live events with reference data (e.g. from a database)

  • Stateful aggregations

  • Convert streams from JSON to AVRO

Diagram of what’s being built with the KSQL demo

You can find all of the KSQL commands used during this workshop in the ksql-workshop.sql file.

The slides that go with it can be found here.

Note
For Windows instructions, please see ksql-workshop-windows.adoc

Don’t forget to check out the #ksql channel on our Community Slack group

——Robin Moffatt @rmoff

2. Prerequisites

Important
These MUST be completed before the workshop!

Please see pre-requisites.adoc

3. Start Confluent Platform

NOTE: Make sure that Docker has at least 8GB of memory available (check with docker system info | grep Memory)

Note
For Windows instructions, please see ksql-workshop-windows.adoc

Change the working directory:

cd demo-scene/ksql-workshop

Start Confluent Platform:

docker-compose up -d
docker-compose logs -f kafka|grep "INFO Kafka version"

Once you see output then it means Kafka is running and you can proceed

$ docker-compose logs -f kafka|grep "INFO Kafka version"
kafka_1             | [2019-03-15 16:04:53,441] INFO Kafka version : 2.1.1-cp1 (org.apache.kafka.common.utils.AppInfoParser)

Press Ctrl-C twice to exit the docker-compose logs command

Run docker-compose ps to confirm that all components are running:

$ docker-compose ps
      Name                    Command               State                          Ports
----------------------------------------------------------------------------------------------------------------
connect-debezium   /docker-entrypoint.sh start      Up      0.0.0.0:8083->8083/tcp, 8778/tcp, 9092/tcp, 9779/tcp
control-center     bash -c echo "Waiting two  ...   Up      0.0.0.0:9021->9021/tcp
datagen-ratings    bash -c echo Waiting for K ...   Up
elasticsearch      /usr/local/bin/docker-entr ...   Up      0.0.0.0:9200->9200/tcp, 9300/tcp
kafka              /etc/confluent/docker/run        Up      0.0.0.0:9092->9092/tcp
kafka-connect      /etc/confluent/docker/run        Up      0.0.0.0:18083->18083/tcp, 8083/tcp, 9092/tcp
kafkacat           sleep infinity                   Up
kibana             /usr/local/bin/kibana-docker     Up      0.0.0.0:5601->5601/tcp
ksql-cli           /bin/sh                          Up
ksql-server        /etc/confluent/docker/run        Up      0.0.0.0:8088->8088/tcp
mysql              docker-entrypoint.sh mysqld      Up      0.0.0.0:3306->3306/tcp, 33060/tcp
schema-registry    /etc/confluent/docker/run        Up      8081/tcp
zookeeper          /etc/confluent/docker/run        Up      2181/tcp, 2888/tcp, 3888/tcp
Important
If any components do not show "Up" under the State column (e.g. they say "Exit") then you must rectify this before continuing. As a first solution, try re-issuing the docker-compose up -d command.

4. KSQL

KSQL can be used via the command line interface (CLI), a graphical UI built into Confluent Control Center, or the documented REST API.

In this workshop we will use the CLI, which if you have used Oracle’s sql*plus, MySQL CLI, and so on will feel very familiar to you.

Launch the CLI:

docker-compose exec ksql-cli ksql http://ksql-server:8088

Make sure that you get a successful start up screen:

                  ===========================================
                  =        _  __ _____  ____  _             =
                  =       | |/ // ____|/ __ \| |            =
                  =       | ' /| (___ | |  | | |            =
                  =       |  <  \___ \| |  | | |            =
                  =       | . \ ____) | |__| | |____        =
                  =       |_|\_\_____/ \___\_\______|       =
                  =                                         =
                  =  Streaming SQL Engine for Apache Kafka® =
                  ===========================================

Copyright 2017-2018 Confluent Inc.

CLI v5.2.1, Server v5.2.1 located at http://ksql-server:8088

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

4.1. See available Kafka topics

KSQL can be used to view the topic metadata on a Kafka cluster (SHOW TOPICS;), as well as inspect the messages in a topic (PRINT <topic>;).

ksql> SHOW TOPICS;

 Kafka Topic                     | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-------------------------------------------------------------------------------------------------------------
 _confluent-metrics              | false      | 12         | 1                  | 0         | 0
 _schemas                        | false      | 1          | 1                  | 0         | 0
 ratings                         | false      | 1          | 1                  | 0         | 0
[...]
-------------------------------------------------------------------------------------------------------------
ksql>

4.2. Inspect a topic contents - Ratings

The event stream driving this example is a simulated stream of events purporting to show the ratings left by users on a website, with data elements including the device type that they used, the star rating, and a message associated with the rating.

Using the PRINT command we can easily see column names and values within a topic’s messages. Kafka messages consist of a timestamp, key, and message (payload), which are all shown in the PRINT output.

Tip

Note that we don’t need to know the format of the data; KSQL introspects the data and understands how to deserialise it.

ksql> PRINT 'ratings';
Format:AVRO
22/02/18 12:55:04 GMT, 5312, {"rating_id": 5312, "user_id": 4, "stars": 4, "route_id": 2440, "rating_time": 1519304104965, "channel": "web", "message": "Surprisingly good, maybe you are getting your mojo back at long last!"}
22/02/18 12:55:05 GMT, 5313, {"rating_id": 5313, "user_id": 3, "stars": 4, "route_id": 6975, "rating_time": 1519304105213, "channel": "web", "message": "why is it so difficult to keep the bathrooms clean ?"}

Press Ctrl-C to cancel and return to the KSQL prompt.

Note

Optional: You can also inspect the Kafka topic with a tool such as kafkacat. Just like KSQL this shows the Kafka messages, but it also includes detailed metadata.

docker-compose exec kafkacat \
          kafkacat -b kafka:29092 -C -K: \
          -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' \
          -t ratings -o end

Note that the data is in Avro, so you will see lots of special characters in the output.

Press Ctrl-C to cancel and return to the command prompt.

5. Querying and filtering streams of data

Having inspected the topics and contents of them, let’s get into some SQL now. The first step in KSQL is to register the source topic with KSQL.

5.1. Register the ratings topic

By registering a topic with KSQL, we declare its schema and properties.

The inbound event stream of ratings data is a STREAM—later we will talk about TABLE, but for now, we just need a simple CREATE STREAM with the appropriate values in the WITH clause:

ksql> CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');

 Message
---------------
 Stream created
---------------
Note
Here we’re using data in Avro format, but you can also read and write JSON or CSV data with KSQL. If you use JSON or CSV you have to specify the schema when you create the stream.

5.2. Describe ratings stream

You’ll notice that in the above CREATE STREAM statement we didn’t specify any of the column names. That’s because the data is in Avro format, and the Confluent Schema Registry supplies the actual schema details. You can use DESCRIBE to examine an object’s columns:

ksql> DESCRIBE ratings;
Name                 : RATINGS
 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 RATING_ID   | BIGINT
 USER_ID     | INTEGER
 STARS       | INTEGER
 ROUTE_ID    | INTEGER
 RATING_TIME | BIGINT
 CHANNEL     | VARCHAR(STRING)
 MESSAGE     | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

Note the presence of a couple of (system) columns here. ROWTIME is the timestamp of the Kafka message—important for when we do time-based aggregations later— and ROWKEY is the key of the Kafka message.

5.3. Querying data in KSQL

Let’s run our first SQL. As anyone familar with SQL knows, SELECT * will return all columns from a given object. So let’s try it!

ksql> SELECT * FROM ratings;
1529501380124 | 6229 | 6229 | 17 | 2 | 3957 | 1529501380124 | iOS-test | why is it so difficult to keep the bathrooms clean ?
1529501380197 | 6230 | 6230 | 14 | 2 | 2638 | 1529501380197 | iOS | your team here rocks!
1529501380641 | 6231 | 6231 | 12 | 1 | 9870 | 1529501380641 | iOS-test | (expletive deleted)
[…]

You’ll notice that the data keeps on coming. That is because KSQL is fundamentally a streaming engine, and the queries that you run are continuous queries. Having previously set the offset to earliest KSQL is showing us the past (data from the beginning of the topic), the present (data now arriving in the topic), and the future (all new data that arrives in the topic from now on).

Press Ctrl-C to cancel the query and return to the KSQL command prompt.

To inspect a finite set of data, you can use the LIMIT clause. Try it out now:

ksql> SELECT * FROM ratings LIMIT 5;
1529499830648 | 1 | 1 | 8 | 1 | 7562 | 1529499829398 | ios | more peanuts please
1529499830972 | 2 | 2 | 5 | 4 | 54 | 1529499830972 | iOS | your team here rocks!
1529499831203 | 3 | 3 | 16 | 1 | 9809 | 1529499831203 | web | airport refurb looks great, will fly outta here more!
1529499831521 | 4 | 4 | 5 | 1 | 7691 | 1529499831521 | web | thank you for the most friendly, helpful experience today at your new lounge
1529499831814 | 5 | 5 | 19 | 3 | 389 | 1529499831814 | ios | thank you for the most friendly, helpful experience today at your new lounge
Limit Reached
Query terminated
ksql>

5.4. Filtering streams of data in KSQL

Since KSQL is heavily based on SQL, you can do many of the standard SQL things you’d expect to be able to do, including predicates and selection of specific columns:

ksql> SELECT USER_ID, STARS, CHANNEL, MESSAGE FROM ratings WHERE STARS <3 AND CHANNEL='iOS' LIMIT 3;
3 | 2 | iOS | your team here rocks!
2 | 1 | iOS | worst. flight. ever. #neveragain
15 | 2 | iOS | worst. flight. ever. #neveragain
Limit Reached
Query terminated
ksql>

5.5. KSQL offsets

Since Apache Kafka persists data, it is possible to use KSQL to query and process data from the past, as well as new events that arrive on the topic.

To tell KSQL to process from beginning of topic run SET 'auto.offset.reset' = 'earliest';

Run this now, so that future processing includes all existing data.

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

6. Creating a Kafka topic populated by a filtered stream

Filtering data with KSQL

Let’s take the poor ratings from people with iOS devices, and create a new stream from them!

ksql> CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';

 Message
----------------------------
 Stream created and running
----------------------------

What this does is set a KSQL continuous query running that processes messages on the source ratings topic to:

  • applies the predicates (STARS<3 AND CHANNEL='iOS'`)

  • selects just the specified columns

    • If you wanted to take all columns from the source stream, you would simply use SELECT *

Each processed message is written to a new Kafka topic. Remember, this is a continuous query, so every single source message—past, present, and future—will be processed with low-latency in this way. If we only want to process new messages and not existing ones, we would configure SET 'auto.offset.reset' = 'latest';.

This method of creating derived topics is frequently referred to by the acronym of the statement—CSAS (→ CREATE STREAM … AS SELECT).

6.1. Inspect the derived stream

Using DESCRIBE we can see that the new stream has the same columns as the source one.

ksql> DESCRIBE POOR_RATINGS;
Name                 : POOR_RATINGS
 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 RATING_ID   | BIGINT
 USER_ID     | INTEGER
 STARS       | INTEGER
 ROUTE_ID    | INTEGER
 RATING_TIME | BIGINT
 CHANNEL     | VARCHAR(STRING)
 MESSAGE     | VARCHAR(STRING)
-----------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
ksql>

Additional information about the derived stream is available with the DESCRIBE EXTENDED command:

ksql> DESCRIBE EXTENDED POOR_RATINGS;
Name                 : POOR_RATINGS
Type                 : STREAM
Key field            :
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : POOR_RATINGS (partitions: 4, replication: 1)

 Field       | Type
-----------------------------------------
 ROWTIME     | BIGINT           (system)
 ROWKEY      | VARCHAR(STRING)  (system)
 RATING_ID   | BIGINT
 USER_ID     | INTEGER
 STARS       | INTEGER
 ROUTE_ID    | INTEGER
 RATING_TIME | BIGINT
 CHANNEL     | VARCHAR(STRING)
 MESSAGE     | VARCHAR(STRING)
-----------------------------------------

Queries that write into this STREAM
-----------------------------------
CSAS_POOR_RATINGS_0 : CREATE STREAM POOR_RATINGS AS SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:     10.04   total-messages:       998     last-message: 6/20/18 1:46:09 PM UTC
 failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic POOR_RATINGS)
ksql>

Note the runtime statistics above. If you re-run the DESCRIBE EXTENDED command you’ll see these values increasing.

Local runtime statistics
------------------------
messages-per-sec:      0.33   total-messages:      1857     last-message: 6/20/18 2:33:26 PM UTC
 failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
(Statistics of the local KSQL server interaction with the Kafka topic POOR_RATINGS)

N.B. you can use the up arrow on your keyboard to cycle through KSQL command history for easy access and replay of previous commands. Ctrl-R also works for searching command history.

6.2. Query the stream

The derived stream that we’ve created is just another stream that we can interact with in KSQL as any other. If you run a SELECT against the stream you’ll see new messages arriving based on those coming from the source ratings topic:

ksql> SELECT STARS, CHANNEL, MESSAGE FROM POOR_RATINGS;
1 | iOS | worst. flight. ever. #neveragain
2 | iOS | Surprisingly good, maybe you are getting your mojo back at long last!
2 | iOS | thank you for the most friendly, helpful experience today at your new lounge

Press Ctrl-C to cancel and return to the KSQL prompt.

6.3. It’s just a Kafka topic…

The query that we created above (CREATE STREAM POOR_RATINGS AS…) populates a Kafka topic, which we can also access as a KSQL stream (as in the previous step). Let’s inspect this topic now, using KSQL.

Observe that the topic exists:

ksql> SHOW TOPICS;

 Kafka Topic        | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
------------------------------------------------------------------------------------------------
[…]
 POOR_RATINGS       | true       | 4          | 1                  | 0         | 0
 ratings            | true       | 1          | 1                  | 1         | 1
------------------------------------------------------------------------------------------------
ksql>

Inspect the Kafka topic’s data

ksql> PRINT 'POOR_RATINGS';
Format:AVRO
6/20/18 11:01:03 AM UTC, 37, {"RATING_ID": 37, "USER_ID": 12, "STARS": 2, "ROUTE_ID": 8916, "RATING_TIME": 1529492463400, "CHANNEL": "iOS", "MESSAGE": "more peanuts please"}
6/20/18 11:01:07 AM UTC, 55, {"RATING_ID": 55, "USER_ID": 10, "STARS": 2, "ROUTE_ID": 5232, "RATING_TIME": 1529492467552, "CHANNEL": "iOS", "MESSAGE": "why is it so difficult to keep the bathrooms clean ?"}

6.4. Optional step: Alternative serialisation methods

By default KSQL will write to the target stream using the same serialisation as the source. So if it’s reading Avro data, it’ll write Avro data. You can override this behaviour using the WITH (VALUE_FORMAT='xx') syntax. For example, if for some terrible reason you wanted to write data to CSV in a topic, you would run:

CREATE STREAM POOR_RATINGS_CSV WITH (VALUE_FORMAT='DELIMITED') AS
  SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';

You can use a similar syntax to change the name and partitions of the target Kafka topic:

CREATE STREAM POOR_RATINGS_JSON WITH (VALUE_FORMAT='JSON', PARTITIONS=2) AS
  SELECT * FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
Note

Re-serialising data this way can be a powerful use of KSQL in itself. Consider a source topic being written to in CSV format, and this topic is to be used by multiple consumers. One option is each consumer has to know the schema of the CSV. Another option is to declare the schema in KSQL once, and then reserialise the data into a better format such as Avro. Now each consumer can use the data without being tightly-coupled to the original system.

The SQL code would look like this - the source topic doesn’t exist in your workshop environment so you won’t be able to actually run it.

CREATE STREAM USER_LOGON_CSV (first_name VARCHAR,
                              last_name  VARCHAR,
                              email      VARCHAR,
                              ip_address VARCHAR,
                              logon_date VARCHAR)
        WITH (KAFKA_TOPIC ='user_logons',
              VALUE_FORMAT='DELIMITED');

CREATE STREAM USER_LOGON WITH (VALUE_FORMAT='AVRO') AS
  SELECT * FROM USER_LOGON_CSV;

7. Kafka Connect / Integrating Kafka with a database

This shows how to ingest the Customers data from a database using Kafka Connect and CDC.

Check that Kafka Connect with Debezium’s connector has started:

docker-compose logs -f connect-debezium|grep "Kafka Connect started"

Wait for the output:

connect-debezium_1  | 2018-09-04 11:33:04,639 INFO   ||  Kafka Connect started   [org.apache.kafka.connect.runtime.Connect]

Press Ctrl-C to return to the command prompt.

We’ll now create two Kafka Connect connectors. Both stream events from MySQL into Kafka using Debezium, but differ in how they handle the message structure.

docker-compose exec connect-debezium bash -c '/scripts/create-mysql-source.sh'

You should see HTTP/1.1 201 Created, twice.

Note

Optional: If you are interested you can inspect the script file (scripts/create-mysql-source.sh in the workshop folder) that includes the configuration payload in JSON.

  1. The connector called mysql-source-demo-customers flattens the before/after record state data, along with the nested metadata, into a single flat structure. This is what we use during the rest of the workshop.

    The flattening is done using a Single Message Transform from Debezium, called io.debezium.transforms.UnwrapFromEnvelope.

    The connector also uses two Single Message Transforms to illustrate how metadata can be added to ingested data. The InsertField transformation adds the topic name into a field called messagetopic, and some fixed text into the messagesource field.

  2. The connector mysql-source-demo-customers-raw retains the nested structure of the before/after record data.

    A Single Message Transform is used to route the messages to a different topic. By default Debezium will use the format server.schema.table when streaming a table’s data to a Kafka topic. We use the RegexRouter to redirect the messages to a topic with a -raw suffix.

7.1. Exploring CDC change records

Start a MySQL command prompt:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'

Now in a separate terminal window run the following, to stream the contents of the customers topic and any changes to stdout:

# Make sure you run this from the `demo-scene/ksql-workshop` folder
docker-compose exec -T kafka-connect \
        kafka-avro-console-consumer \
        --bootstrap-server kafka:29092 \
        --property schema.registry.url=http://schema-registry:8081 \
        --topic asgard.demo.CUSTOMERS-raw --from-beginning|jq '.'

(jq is useful here—if you don’t have it installed, remove |jq '.' from the above command).

Note the customer data shown, and the structure of it, with before, after, and source data.

From the MySQL command prompt, make some changes to the data:

INSERT INTO CUSTOMERS (ID,FIRST_NAME,LAST_NAME) VALUES (42,'Rick','Astley');
UPDATE CUSTOMERS SET FIRST_NAME = 'Thomas', LAST_NAME ='Smith' WHERE ID=2;

You should see each DML cause an almost-instantaneous update on the Kafka topic. For each change, inspect the output of the Kafka topic. Observe the difference between an INSERT and UPDATE.

7.2. Inspect a topic contents with KSQL - Customers

Let’s look at the customer data from the KSQL prompt. This is pretty much the same as using kafka-avro-console-consumer as we did above.

Here we use the FROM BEGINNING argument, which tells KSQL to go back to the beginning of the topic and show all data from there

ksql> PRINT 'asgard.demo.CUSTOMERS' FROM BEGINNING;
Format:AVRO
3/4/19 5:50:42 PM UTC, Struct{id=1}, {"id": 1, "first_name": "Rica", "last_name": "Blaisdell", "email": "[email protected]", "gender": "Female", "club_status": "bronze", "comments": "Universal optimal hierarchy", "create_ts": "2019-03-04T17:48:20Z", "update_ts": "2019-03-04T17:48:20Z", "messagetopic": "asgard.demo.CUSTOMERS", "messagesource": "Debezium CDC from MySQL on asgard"}
3/4/19 5:50:42 PM UTC, Struct{id=2}, {"id": 2, "first_name": "Ruthie", "last_name": "Brockherst", "email": "[email protected]", "gender": "Female", "club_status": "platinum", "comments": "Reverse-engineered tangible interface", "create_ts": "2019-03-04T17:48:20Z", "update_ts": "2019-03-04T17:48:20Z", "messagetopic": "asgard.demo.CUSTOMERS", "messagesource": "Debezium CDC from MySQL on asgard"}

Press Ctrl-C to cancel and return to the KSQL command prompt.

7.3. Re-Key Topics with KSQL

Since we’re going to eventually join the customer data to the ratings, the customer Kafka messages must be keyed on the field on which we are performing the join. If this is not the case the join will fail and we’ll get NULL values in the result.

Our source customer messages are currently keyed using the Primary Key of the source table, but using a key serialisation that KSQL does not yet support—and thus in effect is not useful as a key in KSQL at all.

To re-key a topic in Kafka we can use KSQL!

First we will register the customer topic.

ksql> CREATE STREAM CUSTOMERS_SRC WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------
ksql>

With the stream registered, we can now re-key the topic, using a KSQL CSAS and the PARTITION BY clause. Note that we’re also changing the number of partitions from that of the source (4) to match that of the ratings topic (1):

Important

By changing the partition key, data may move between partitions, and thus its ordering change. Kafka’s strict ordering guarantee only applies within a partition.

In our example this doesn’t matter, but be aware of this if you rely on this re-keying technique in other KSQL queries.

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM CUSTOMERS_SRC_REKEY
        WITH (PARTITIONS=1) AS
        SELECT * FROM CUSTOMERS_SRC PARTITION BY ID;
 Message
----------------------------
 Stream created and running
----------------------------
ksql>
Note

Optional

To inspect the key for a given stream/table, you can use the ROWKEY system column.

Here we compare it to the join column (ID); for the join to succeed they must be equal.

In the source stream, the ROWKEY reflects the key of the underlying Kafka messages as set by the producer (read by KSQL as a String):

ksql> SELECT C.ROWKEY, C.ID FROM CUSTOMERS_SRC C LIMIT 3;
 | 1
 | 2
 | 3
Limit Reached
Query terminated

In the re-keyed stream the ROWKEY and ID are equal, which is essential for a successful JOIN operation in KSQL.

ksql> SELECT C.ROWKEY, C.ID FROM CUSTOMERS_SRC_REKEY C LIMIT 3;
1 | 1
2 | 2
3 | 3
Limit Reached
Query terminated
ksql>

8. The Stream/Table duality

We’re now going to model the Customers topic as a KSQL Table. This is a semantic construct that enables us to work with the data in the topic as key/value pairs, with a single value for each key. You can read more about this here.

Register a TABLE over the new re-keyed Kafka topic. Why’s it a table? Because for each key (user id), we want to know its current value (name, status, etc)

ksql> CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='CUSTOMERS_SRC_REKEY', VALUE_FORMAT ='AVRO', KEY='ID');

 Message
---------------
 Table created
---------------
ksql>
Note

n.b. if you get the error Avro schema for message values on topic CUSTOMERS_SRC_REKEY does not exist in the Schema Registry. then :

  • Check that the topic’s source stream is created:

    ksql> SHOW STREAMS;
     Stream Name         | Kafka Topic         | Format
    ----------------------------------------------------
     CUSTOMERS_SRC_REKEY | CUSTOMERS_SRC_REKEY | AVRO
     [...]
  • Verify that the source stream is processing messages by running DESCRIBE EXTENDED CUSTOMERS_SRC_REKEY;. Under the heading Local runtime statistics you should see:

    Local runtime statistics
    ------------------------
    messages-per-sec:      0.10   total-messages:        10     last-message: 6/28/18 6:23:54 PM UTC
     failed-messages:         0 failed-messages-per-sec:         0      last-failed:       n/a
    • If no 'messages-per-sec' is shown, the next step is to verify that you ran SET 'auto.offset.reset' = 'earliest'; earlier. You can run it again to be certain. If it says Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest' then the null shows that it wasn’t previously set.

    • If this was the case, then you need to drop and recreate the stream in order to process the customer data:

      TERMINATE QUERY CSAS_CUSTOMERS_SRC_REKEY_0;
      DROP STREAM CUSTOMERS_SRC_REKEY;

      Then re-run the CREATE STREAM CUSTOMERS_SRC_REKEY[…] from above. Use SHOW QUERIES; to list the queries running if the name differs from that shown in the TERMINATE statement.

Query the table:

ksql> SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS FROM CUSTOMERS LIMIT 3;
1 | Rica | Blaisdell | rblaisdell0@rambler.ru | bronze
3 | Mariejeanne | Cocci | mcocci2@techcrunch.com | bronze
4 | Hashim | Rumke | hrumke3@sohu.com | platinum
Limit Reached
Query terminated

Both CUSTOMER (table) and CUSTOMER_SRC_REKEY (stream) are driven from the same Kafka topic. You can examine the stream/table duality by running two KSQL sessions side-by-side, each querying one for a given key:

ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS FROM CUSTOMERS WHERE ID=2;
2 | Thomas | Smith | rbrockherst1@ow.ly | platinum
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS FROM CUSTOMERS_SRC_REKEY WHERE ID=2;
2 | Ruthie | Brockherst | rbrockherst1@ow.ly | platinum
2 | Thomas | Smith | rbrockherst1@ow.ly | platinum

Leave each query running, and in a third window bring up a MySQL session

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD demo'

In MySQL make changes (one at a time) to the record for the ID being queried:

UPDATE CUSTOMERS SET EMAIL='[email protected]' WHERE ID=2;
UPDATE CUSTOMERS SET EMAIL='[email protected]' WHERE ID=2;

When you make the change on MySQL you should see the two KSQL queries emit the new values almost immediately. Then, cancel and re-run the two KSQL queries (against table and stream respectively). Each time you should note that the stream contains every event related to the source table record, whilst the table hold the current state.

9. Joining Data in KSQL

Joining data with KSQL

Let’s use the customer data (CUSTOMERS) and use it to enrich the inbound stream of ratings data (RATINGS) to show against each rating who the customer is, and their club status ('platinum','gold', etc).

Run the following SQL:

SELECT R.MESSAGE, C.FIRST_NAME, C.LAST_NAME
FROM RATINGS R INNER JOIN CUSTOMERS C
ON R.USER_ID = C.ID
LIMIT 5;

There are a couple of things to note about this query :

  • We’re aliasing the table and stream names to make column names unambiguous

  • The backspace character can be used to denote line continuation

In the output you should see a rating message, and the name of the customer who left it:

more peanuts please | Gianina | Mixhel
your team here rocks! | Munmro | Igounet
airport refurb looks great, will fly outta here more! | null | null
thank you for the most friendly, helpful experience today at your new lounge | Munmro | Igounet
thank you for the most friendly, helpful experience today at your new lounge | null | null
Limit Reached
Query terminated
ksql>

Now let’s pull the full set of data, including a reformat of the timestamp into something human readable.

SELECT TIMESTAMPTOSTRING(R.RATING_TIME, 'yyyy-MM-dd HH:mm:ss'), R.RATING_ID, R.STARS, R.ROUTE_ID,  R.CHANNEL,
R.MESSAGE, C.FIRST_NAME, C.LAST_NAME, C.CLUB_STATUS
FROM RATINGS R INNER JOIN CUSTOMERS C
ON R.USER_ID = C.ID;
2018-06-20 13:03:49 | 1 | 1 | 7562 | ios | more peanuts please | Gianina | Mixhel | gold
2018-06-20 13:03:50 | 2 | 4 | 54 | iOS | your team here rocks! | Munmro | Igounet | gold
2018-06-20 13:03:51 | 4 | 1 | 7691 | web | thank you for the most friendly, helpful experience today at your new lounge | Munmro | Igounet | gold
2018-06-20 13:03:51 | 6 | 2 | 6902 | web | Surprisingly good, maybe you are getting your mojo back at long last! | Gianina | Mixhel | gold

Press Ctrl-C to cancel the output.

9.1. Populating a Kafka topic with the results of a Stream-Table join

Let’s persist this as an enriched stream, by simply prefixing the query with CREATE STREAM … AS:

CREATE STREAM RATINGS_WITH_CUSTOMER_DATA WITH (PARTITIONS=1) AS
SELECT R.RATING_ID, R.CHANNEL, R.STARS, R.MESSAGE,
       C.ID, C.CLUB_STATUS, C.EMAIL,
       C.FIRST_NAME, C.LAST_NAME
FROM RATINGS R
     INNER JOIN CUSTOMERS C
       ON R.USER_ID = C.ID ;
 Message
----------------------------
 Stream created and running
----------------------------

9.2. Filtering an enriched stream

Now that we have customer information added to every rating event, we can easily answer questions such as "Which of our Premier customers are not happy?":

SELECT EMAIL, STARS, MESSAGE
FROM RATINGS_WITH_CUSTOMER_DATA
WHERE CLUB_STATUS='platinum'
  AND STARS <3;
aarent0@cpanel.net | 2 | thank you for the most friendly, helpful experience today at your new lounge
mdoughartie1@dedecms.com | 1 | worst. flight. ever. #neveragain

9.3. Daisy-chaining derived streams

Filtering enriched data with KSQL

Having enriched the initial stream of ratings events with customer data, we can now persist a filtered version of that stream that includes a predicate to identify just those VIP customers who have left bad reviews:

CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS AS
SELECT CLUB_STATUS, EMAIL, STARS, MESSAGE
FROM   RATINGS_WITH_CUSTOMER_DATA
WHERE  STARS < 3
  AND  CLUB_STATUS = 'platinum';
 Message
----------------------------
 Stream created and running
----------------------------
ksql>

9.3.1. Query the new stream

Now we can query the derived stream to easily identify important customers who are not happy. Since this is backed by a Kafka topic being continually popuated by KSQL we can also drive other applications with this data, as well as land it to datastores down-stream for visualisation.

ksql> SELECT STARS, MESSAGE, EMAIL FROM UNHAPPY_PLATINUM_CUSTOMERS;
1 | is this as good as it gets? really ? | aarent0@cpanel.net
2 | airport refurb looks great, will fly outta here more! | aarent0@cpanel.net
2 | meh | aarent0@cpanel.net

10. Streaming Aggregates

KSQL can create aggregations of event data, either over all events to date (and continuing to update with new data), or based on a time window. The time window types supported are:

  • Tumbling (e.g. every 5 minutes : 00:00, 00:05, 00:10)

  • Hopping (e.g. every 5 minutes, advancing 1 minute: 00:00-00:05, 00:01-00:06)

  • Session (Sets a timeout for the given key, after which any new data is treated as a new session)

To understand more about these time windows, you can read the related Kafka Streams documentation. Since KSQL is built on Kafka Streams, the concepts are the same. The KSQL-specific documentation is also useful.

Note
KSQL will re-emit aggregates as they are updated by incoming events. This means that you will see multiple rows of output for a given window/key.
Aggregating data with KSQL

10.1. Running Count per Minute

This shows the number of ratings per customer status, per minute:

SELECT TIMESTAMPTOSTRING(WindowStart(), 'yyyy-MM-dd HH:mm:ss'),
       CLUB_STATUS, COUNT(*) AS RATING_COUNT
FROM RATINGS_WITH_CUSTOMER_DATA
     WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY CLUB_STATUS;
platinum | 1
bronze | 2
gold | 12
bronze | 13

The time window itself is exposed in the results using the function WindowStart(), which is then cast from epoch to human-readable form with the TIMESTAMPTOSTRING function.

Aggregates can be persisted too. Instead of CREATE STREAM as we did above, we’re going to instead persist with a CREATE TABLE, since aggregates are always a table (key + value). Just as before though, a Kafka topic is continually populated with the results of the query:

CREATE TABLE RATINGS_BY_CLUB_STATUS AS
SELECT WindowStart() AS WINDOW_START_TS, CLUB_STATUS, COUNT(*) AS RATING_COUNT
FROM RATINGS_WITH_CUSTOMER_DATA
     WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY CLUB_STATUS;
 Message
---------------------------
 Table created and running
---------------------------
ksql>

This table that we’ve created is just a first class object in KSQL, updated in real time with the results from the aggregate query. Because it’s just another object in KSQL, we can query and filter it as any other:

SELECT TIMESTAMPTOSTRING(WINDOW_START_TS, 'yyyy-MM-dd HH:mm:ss'),
        CLUB_STATUS, RATING_COUNT
FROM RATINGS_BY_CLUB_STATUS
WHERE CLUB_STATUS='bronze';
2019-03-04 17:42:00 | bronze | 2
2019-03-04 17:42:00 | bronze | 3
2019-03-04 17:42:00 | bronze | 5
2019-03-04 17:42:00 | bronze | 9
2019-03-04 17:42:00 | bronze | 10

If you let the SELECT output continue to run, you’ll see all of the past time window aggregate values—but also the current one. Note that the current time window’s aggregate value will continue to update, because new events are being continually processed and reflected in the value. If you were to send an event to the source ratings topic with a timestamp in the past, the corresponding time window’s aggregate would be re-emitted.

11. Optional: Stream data to Elasticsearch

Note
This section assumes that you are familiar with the use of Kibana.

Using Kafka Connect you can stream data from a Kafka to one (or many) targets, including Elasticsearch, HDFS, S3, and so on.

Here we’ll see how to stream it to Elasticsearch for rapid visualisation and analysis.

From a bash prompt, make sure that Elasticsearch and Kibana are running:

$ docker-compose ps|egrep "elasticsearch|kibana"
elasticsearch                      /usr/local/bin/docker-entr ...   Up      0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp
kibana                             /usr/local/bin/kibana-docker     Up      0.0.0.0:5601->5601/tcp

Create a dynamic mapping in Elasticsearch so that the timestamp of source data is correctly detected:

curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d' { "index_patterns": "*", "settings": { "number_of_shards": 1, "number_of_replicas": 0 }, "mappings": { "_default_": { "dynamic_templates": [ { "dates": { "match": "*TS", "mapping": { "type": "date" } } }, { "non_analysed_string_template": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "keyword" } } } ] } } }'

Create a connector to stream RATINGS_WITH_CUSTOMER_DATA to Elasticsearch:

curl -X "POST" "http://localhost:18083/connectors/" \
-H "Content-Type: application/json" \
-d '{
  "name": "es_sink_unhappy_platinum_customers",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "RATINGS_WITH_CUSTOMER_DATA",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.ignore": "true",
    "schema.ignore": "true",
    "type.name": "type.name=kafkaconnect",
    "topic.index.map": "RATINGS_WITH_CUSTOMER_DATA:ratings_with_customer_data",
    "connection.url": "http://elasticsearch:9200",
    "transforms": "ExtractTimestamp",
    "transforms.ExtractTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.ExtractTimestamp.timestamp.field" : "TS"
  }
}'

Create a connector to stream RATINGS_BY_CLUB_STATUS to Elasticsearch:

curl -X "POST" "http://localhost:18083/connectors/" \
-H "Content-Type: application/json" \
-d '{
  "name": "es_sink_ratings_agg_by_status_1min",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics": "RATINGS_BY_CLUB_STATUS",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.ignore": "false",
    "schema.ignore": "true",
    "type.name": "type.name=kafkaconnect",
    "topic.index.map": "RATINGS_BY_CLUB_STATUS:ratings_agg_by_status_1min",
    "connection.url": "http://elasticsearch:9200"
  }
}'

Note that the above sets "key.ignore": "false" , and thus aggregates will be updated in-place.

If you have jq on your machine you can run this to check that the connector is RUNNING:

$ curl -s "http://localhost:18083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:18083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort

es_sink_ratings_agg_by_status_1min  |  RUNNING  |  RUNNING
es_sink_unhappy_platinum_customers  |  RUNNING  |  RUNNING

Use the Kibana interface to check that docs are arriving in Elasticsearch:

Note
If you get an error screen from Kibana then try restarting the container (docker-compose restart kibana).
kibana 01

Add the index pattern to Kibana, and then use the Discover and Visualise options to explore and create analyses on the data:

kibana 02
kibana 03

12. Shutting down the environment

To terminate the workshop environment, run docker-compose down:

$ docker-compose down
Stopping ksql-workshop_ksql-server_1     ... done
Stopping ksql-workshop_datagen-ratings_1 ... done
Stopping ksql-workshop_schema-registry_1 ... done
Stopping ksql-workshop_kafka_1           ... done
Stopping ksql-workshop_zookeeper_1       ... done
Removing ksql-workshop_ksql-server_1     ... done
Removing ksql-workshop_datagen-ratings_1 ... done
Removing ksql-workshop_schema-registry_1 ... done
Removing ksql-workshop_kafka_1           ... done
Removing ksql-workshop_zookeeper_1       ... done
Removing network ksql-workshop_default

If you want to preserve the state of all containers, run docker-compose stop instead.

13. Next steps

With the enriched and filtered data being populated into Kafka topics from KSQL you can use it to :

  • Feed event-driven applications. For example, notify the ops team if a VIP user leaves a poor review.

  • Stream to analytics platforms. For example, use Kafka Connect to stream the enriched data stream to Elasticsearch and visualise the real time with Kibana.