Skip to content

Commit

Permalink
Updates prior to first delivery 🤞
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Feb 25, 2020
1 parent 18dfbdd commit 726f479
Show file tree
Hide file tree
Showing 3 changed files with 908 additions and 62 deletions.
175 changes: 116 additions & 59 deletions introduction-to-ksqldb/demo_introduction_to_ksqldb.adoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,28 @@
TODO: add instructions for cloning etc
= Introduction to ksqlDB - demo script
Robin Moffatt <robin@confluent.io>
v1.00, 25 February 2020
:toc:

== Running the test rig

Two terminal windows, both running ksqlDB CLI
1. Bring up the stack
+
[source,bash]
----
git clone https://github.com/confluentinc/demo-scene.git
cd introduction-to-ksqldb
docker-compose up -d
----
+

2. Run two terminal windows, side by side, both running ksqlDB CLI
+
[source,bash]
----
docker exec -it ksqldb ksql http://localhost:8088
----

== ksqlDB basics

[Window 1] Create a stream

Expand All @@ -15,7 +36,7 @@ CREATE STREAM MOVEMENTS (LOCATION VARCHAR)

[source,sql]
----
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS,
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS,
ROWKEY AS PERSON,
LOCATION
FROM MOVEMENTS
Expand All @@ -31,12 +52,26 @@ INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('robin', 'Ilkley');
----

[Window 2] Query the stream with a predicate
[Window 2] Cancel the query, show the topics

[source,sql]
----
SHOW TOPICS;
----

Dump the topic contents:

[source,sql]
----
PRINT movements FROM BEGINNING;
----

[Window 2] Cancel the PRINT, query the stream with a predicate

[source,sql]
----
SET 'auto.offset.reset' = 'earliest';
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS,
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS,
ROWKEY AS PERSON,
LOCATION
FROM MOVEMENTS
Expand All @@ -54,12 +89,6 @@ INSERT INTO MOVEMENTS VALUES ('robin', 'Wakefield');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
----

Show the topics

[source,sql]
----
SHOW TOPICS;
----

Create a new topic that could be used to trigger an event-driven app when the user is in a certain location

Expand Down Expand Up @@ -91,27 +120,39 @@ INSERT INTO MOVEMENTS VALUES ('robin', 'Sheffield');
INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
----

-- Difference between stream and table
-- It's the same Kafka topic underneath
== Stream/Table duality

Show the difference between stream and table - but note that it's the *same* Kafka topic underneath

[source,sql]
----
CREATE TABLE MOVEMENTS_T (LOCATION VARCHAR) WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='movements');
CREATE TABLE MOVEMENTS_T (LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='movements');
SELECT ROWKEY AS PERSON, LOCATION FROM MOVEMENTS_T EMIT CHANGES;
SELECT ROWKEY AS PERSON, LOCATION FROM MOVEMENTS EMIT CHANGES;
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
FROM MOVEMENTS_T EMIT CHANGES;
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/Amsterdam') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
FROM MOVEMENTS EMIT CHANGES;
----

[Window 2] Run aggregate query
== Aggregates

[source,sql]
----
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS
SELECT ROWKEY AS PERSON,
COUNT(*) AS LOCATION_CHANGES,
COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
FROM MOVEMENTS
GROUP BY ROWKEY
EMIT CHANGES;
SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
FROM PERSON_STATS
EMIT CHANGES;
----

[Window 1] Insert some data
Expand All @@ -121,65 +162,76 @@ INSERT INTO MOVEMENTS VALUES ('robin', 'Leeds');
INSERT INTO MOVEMENTS VALUES ('robin', 'London');
----

Show a pull query in action

[source,sql]
----
SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
FROM PERSON_STATS
WHERE ROWKEY='robin';
----

Run a pull query using the REST API

[source,bash]
----
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{"ksql":"SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_STATS WHERE ROWKEY='\''robin'\'';"}'|jq '.[].row'
----

== Connecting to other systems

First, check that the connector plugin has been installed.

[source,bash]
----
docker exec -it ksqldb curl -s localhost:8083/connector-plugins|jq '.[].class'
----

Should include `io.confluent.connect.jdbc.JdbcSinkConnector` in its output.

* TODO add in datagen
* TODO INSERT INTO to merge streams
-- TODO: Create sink connector to postgres
-- Include time in inserted data

INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T15:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Leeds');
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T16:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Retford');
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T17:23:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'London');
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T20:23:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'The Parcel Yard');
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-18T09:23:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Leeds');
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-18T10:23:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Leeds');


SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS, ROWKEY AS Person, LOCATION AS Location FROM MOVEMENTS EMIT CHANGES;

+--------------------+-------+---------+
|EVENT_TS |PERSON |LOCATION |
+--------------------+-------+---------+
|2020-02-17 15:22:00 |robin |Leeds |
|2020-02-17 17:23:00 |robin |London |
|2020-02-17 22:23:00 |robin |Leeds |
|2020-02-18 09:00:00 |robin |Leeds |
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE PERSON_MOVEMENTS AS SELECT ROWKEY AS PERSON, COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS, COUNT(*) AS LOCATION_CHANGES FROM MOVEMENTS GROUP BY ROWKEY;
[source,sql]
----
CREATE SINK CONNECTOR SINK_POSTGRES WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'PERSON_STATS',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'auto.create' = 'true',
'insert.mode' = 'upsert',
'pk.mode' = 'record_key',
'pk.fields' = 'PERSON'
);
----

SELECT UNIQUE_LOCATIONS, LOCATION_CHANGES FROM PERSON_MOVEMENTS WHERE PERSON='robin';
Show the data in Postgres

+-----------------+-----------------+
|UNIQUE_LOCATIONS |LOCATION_CHANGES |
+-----------------+-----------------+
| 1 |
[source,sql]
----
SELECT * FROM "PERSON_STATS";
----

Add some more data into Kafka topic, show postgres updating in place.

ksql> SELECT LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_MOVEMENTS WHERE ROWKEY='robin' EMIT CHANGES;
+-----------------+-----------------+
|LOCATION_CHANGES |UNIQUE_LOCATIONS |
+-----------------+-----------------+
|1 |1 |
|2 |2 |
|3 |3 |
|4 |3 |
'''

Press CTRL-C to interrupt
== Appendix

=== Setting the ROWTIME of inserted data

[source,sql]
----
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T15:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Leeds');
INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('2020-02-17T16:22:00Z','yyyy-MM-dd''T''HH:mm:ssX'), 'robin', 'Retford');
----

=== Deploying code via REST API

[source,sql]
----
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{"ksql":"CREATE STREAM MOVEMENTS (LOCATION VARCHAR) WITH (VALUE_FORMAT='\''JSON'\'', PARTITIONS=1, KAFKA_TOPIC='\''movements'\'');"}'
Expand All @@ -193,5 +245,10 @@ docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
"ksql.streams.auto.offset.reset": "earliest"
}
}'
----

=== TODO

* TODO add in datagen
* TODO INSERT INTO to merge streams

Loading

0 comments on commit 726f479

Please sign in to comment.