Skip to content

Commit

Permalink
Update demo to ksqlDB 0.10
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Jun 29, 2020
1 parent ef65161 commit 6d67918
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 801 deletions.
29 changes: 16 additions & 13 deletions introduction-to-ksqldb/demo_introduction_to_ksqldb.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ docker exec -it ksqldb ksql http://localhost:8088

[source,sql]
----
CREATE STREAM MOVEMENTS (LOCATION VARCHAR)
CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');
----

Expand All @@ -37,7 +37,7 @@ CREATE STREAM MOVEMENTS (LOCATION VARCHAR)
[source,sql]
----
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS,
ROWKEY AS PERSON,
PERSON,
LOCATION
FROM MOVEMENTS
EMIT CHANGES;
Expand Down Expand Up @@ -72,7 +72,7 @@ PRINT movements FROM BEGINNING;
----
SET 'auto.offset.reset' = 'earliest';
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS,
ROWKEY AS PERSON,
PERSON,
LOCATION
FROM MOVEMENTS
WHERE LCASE(LOCATION)='leeds'
Expand All @@ -94,6 +94,8 @@ Create a new topic that could be used to trigger an event-driven app when the us

[source,sql]
----
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM LEEDS_USERS WITH (KAFKA_TOPIC='leeds-users')
AS SELECT * FROM MOVEMENTS WHERE LCASE(LOCATION)='leeds' EMIT CHANGES;
----
Expand Down Expand Up @@ -126,13 +128,13 @@ Show the difference between stream and table - but note that it's the *same* Kaf

[source,sql]
----
CREATE TABLE MOVEMENTS_T (LOCATION VARCHAR)
CREATE TABLE MOVEMENTS_T (PERSON VARCHAR PRIMARY KEY, LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='movements');
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS, PERSON, LOCATION
FROM MOVEMENTS_T EMIT CHANGES;
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS, PERSON, LOCATION
FROM MOVEMENTS EMIT CHANGES;
----

Expand All @@ -143,14 +145,15 @@ SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS
SELECT ROWKEY AS PERSON,
SELECT PERSON,
LATEST_BY_OFFSET(LOCATION) AS LATEST_LOCATION,
COUNT(*) AS LOCATION_CHANGES,
COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
FROM MOVEMENTS
GROUP BY ROWKEY
GROUP BY PERSON
EMIT CHANGES;
SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
SELECT PERSON, LATEST_LOCATION, LOCATION_CHANGES, UNIQUE_LOCATIONS
FROM PERSON_STATS
EMIT CHANGES;
----
Expand All @@ -166,9 +169,9 @@ Show a pull query in action

[source,sql]
----
SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
SELECT PERSON, LATEST_LOCATION, LOCATION_CHANGES, UNIQUE_LOCATIONS
FROM PERSON_STATS
WHERE ROWKEY='robin';
WHERE PERSON='robin';
----

Run a pull query using the REST API
Expand All @@ -177,7 +180,7 @@ Run a pull query using the REST API
----
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{"ksql":"SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_STATS WHERE ROWKEY='\''robin'\'';"}'|jq '.[].row'
-d '{"ksql":"SELECT PERSON, LATEST_LOCATION, LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_STATS WHERE PERSON='\''robin'\'';"}'|jq '.[].row'
----

== Connecting to other systems
Expand Down Expand Up @@ -239,7 +242,7 @@ INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('202
----
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{"ksql":"CREATE STREAM MOVEMENTS (LOCATION VARCHAR) WITH (VALUE_FORMAT='\''JSON'\'', PARTITIONS=1, KAFKA_TOPIC='\''movements'\'');"}'
-d '{"ksql":"CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR) WITH (VALUE_FORMAT='\''JSON'\'', PARTITIONS=1, KAFKA_TOPIC='\''movements'\'');"}'
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
Expand Down
Loading

0 comments on commit 6d67918

Please sign in to comment.