Skip to content

Commit

Permalink
Merge branch 'bump-ksqldb-0.10'
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Jun 29, 2020
2 parents 1c3caca + 6d67918 commit 3e600b0
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 841 deletions.
79 changes: 41 additions & 38 deletions build-a-streaming-pipeline/demo_build-a-streaming-pipeline.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -311,15 +311,16 @@ Create ksqlDB stream and table

[source,sql]
----
CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
CREATE TABLE CUSTOMERS (CUSTOMER_ID VARCHAR PRIMARY KEY)
WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
----

Query the ksqlDB table:

[source,sql]
----
SET 'auto.offset.reset' = 'earliest';
SELECT ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS FROM CUSTOMERS EMIT CHANGES;
SELECT CUSTOMER_ID, FIRST_NAME, LAST_NAME, EMAIL, CLUB_STATUS FROM CUSTOMERS EMIT CHANGES;
----


Expand Down Expand Up @@ -355,7 +356,7 @@ Here's the table - the latest value for a given key
[source,sql]
----
SELECT TIMESTAMPTOSTRING(ROWTIME, 'HH:mm:ss') AS EVENT_TS,
ID,
CUSTOMER_ID,
FIRST_NAME,
LAST_NAME,
EMAIL,
Expand All @@ -366,23 +367,23 @@ SELECT TIMESTAMPTOSTRING(ROWTIME, 'HH:mm:ss') AS EVENT_TS,

[source,sql]
----
+----------+----+-----------+----------+-----------------+------------+
|EVENT_TS |ID |FIRST_NAME |LAST_NAME |EMAIL |CLUB_STATUS |
+----------+----+-----------+----------+-----------------+------------+
|15:43:58 |42 |Rick |Astley |[email protected] |platinum |
+-----------+-------------+-----------+----------+-----------------+------------+
|EVENT_TS |CUSTOMER_ID |FIRST_NAME |LAST_NAME |EMAIL |CLUB_STATUS |
+-----------+-------------+-----------+----------+-----------------+------------+
|09:20:15 |42 |Rick |Astley |[email protected] |platinum |
^CQuery terminated
----

Here's the stream - every event, which in this context is every change event on the source database:

[source,sql]
----
CREATE STREAM CUSTOMERS_STREAM WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
CREATE STREAM CUSTOMERS_STREAM (CUSTOMER_ID VARCHAR KEY) WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
SET 'auto.offset.reset' = 'earliest';
SELECT TIMESTAMPTOSTRING(ROWTIME, 'HH:mm:ss') AS EVENT_TS,
ID,
CUSTOMER_ID,
FIRST_NAME,
LAST_NAME,
EMAIL,
Expand All @@ -394,13 +395,13 @@ SELECT TIMESTAMPTOSTRING(ROWTIME, 'HH:mm:ss') AS EVENT_TS,

[source,sql]
----
+---------+----+-----------+----------+------------------+------------+
|TS |ID |FIRST_NAME |LAST_NAME |EMAIL |CLUB_STATUS |
+---------+----+-----------+----------+------------------+------------+
|16:08:49 |42 |Rick |Astley |null |null |
|16:09:30 |42 |Rick |Astley |[email protected] |null |
|16:09:32 |42 |Rick |Astley |[email protected] |bronze |
|16:09:35 |42 |Rick |Astley |[email protected] |platinum |
+----------+------------+-----------+----------+-----------------+------------+
|EVENT_TS |CUSTOMER_ID |FIRST_NAME |LAST_NAME |EMAIL |CLUB_STATUS |
+----------+------------+-----------+----------+-----------------+------------+
|09:20:07 |42 |Rick |Astley |null |null |
|09:20:10 |42 |Rick |Astley |[email protected] |null |
|09:20:13 |42 |Rick |Astley |[email protected] |bronze |
|09:20:15 |42 |Rick |Astley |[email protected] |platinum |
^CQuery terminated
ksql>
----
Expand All @@ -413,28 +414,28 @@ ksql>
[source,sql]
----
SELECT R.RATING_ID, R.MESSAGE, R.CHANNEL,
C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
C.CUSTOMER_ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
C.CLUB_STATUS
FROM RATINGS R
LEFT JOIN CUSTOMERS C
ON CAST(R.USER_ID AS STRING) = C.ROWKEY
ON CAST(R.USER_ID AS STRING) = C.CUSTOMER_ID
WHERE C.FIRST_NAME IS NOT NULL
EMIT CHANGES;
----

[source,sql]
----
+------------+-----------------------------------+-------+--------------------+-------------+
|RATING_ID |MESSAGE |ID |FULL_NAME |CLUB_STATUS |
+------------+-----------------------------------+-------+--------------------+-------------+
|1 |more peanuts please |9 |Even Tinham |silver |
|2 |Exceeded all my expectations. Thank|8 |Patti Rosten |silver |
| | you ! | | | |
|3 |meh |17 |Brianna Paradise |bronze |
|4 |is this as good as it gets? really |14 |Isabelita Talboy |gold |
| |? | | | |
|5 |why is it so difficult to keep the |19 |Josiah Brockett |gold |
| |bathrooms clean ? | | | |
+------------+-----------------------------------+------------+--------------------+-------------+
|RATING_ID |MESSAGE |CUSTOMER_ID |FULL_NAME |CLUB_STATUS |
+------------+-----------------------------------+------------+--------------------+-------------+
|1 |more peanuts please |9 |Even Tinham |silver |
|2 |Exceeded all my expectations. Thank|8 |Patti Rosten |silver |
| | you ! | | | |
|3 |meh |17 |Brianna Paradise |bronze |
|4 |is this as good as it gets? really |14 |Isabelita Talboy |gold |
| |? | | | |
|5 |why is it so difficult to keep the |19 |Josiah Brockett |gold |
| |bathrooms clean ? | | | |
----

Expand All @@ -447,11 +448,11 @@ CREATE STREAM RATINGS_WITH_CUSTOMER_DATA
WITH (KAFKA_TOPIC='ratings-enriched')
AS
SELECT R.RATING_ID, R.MESSAGE, R.STARS, R.CHANNEL,
C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
C.CUSTOMER_ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME,
C.CLUB_STATUS, C.EMAIL
FROM RATINGS R
LEFT JOIN CUSTOMERS C
ON CAST(R.USER_ID AS STRING) = C.ROWKEY
ON CAST(R.USER_ID AS STRING) = C.CUSTOMER_ID
WHERE C.FIRST_NAME IS NOT NULL
EMIT CHANGES;
----
Expand All @@ -466,11 +467,12 @@ Check out the ratings for customer id 2 only:
SELECT TIMESTAMPTOSTRING(ROWTIME, 'HH:mm:ss') AS EVENT_TS,
FULL_NAME, CLUB_STATUS, STARS, MESSAGE, CHANNEL
FROM RATINGS_WITH_CUSTOMER_DATA
WHERE ID=2
WHERE CAST(CUSTOMER_ID AS INT)=2
EMIT CHANGES;
----

In mysql, make a change to ID 2

[source,sql]
----
UPDATE CUSTOMERS SET CLUB_STATUS = 'bronze' WHERE ID=2;
Expand All @@ -486,7 +488,8 @@ CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS AS
SELECT FULL_NAME, CLUB_STATUS, EMAIL, STARS, MESSAGE
FROM RATINGS_WITH_CUSTOMER_DATA
WHERE STARS < 3
AND CLUB_STATUS = 'platinum';
AND CLUB_STATUS = 'platinum'
PARTITION BY FULL_NAME;
----

== Stream to Elasticsearch
Expand Down Expand Up @@ -555,14 +558,14 @@ image:images/es03.png[Kibana]

=== Aggregations

Simple aggregation - count of ratings per person, per minute:
Simple aggregation - count of ratings per person, per 15 minutes:

[source,sql]
----
SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
FULL_NAME,COUNT(*) AS RATINGS_COUNT
FROM RATINGS_WITH_CUSTOMER_DATA
WINDOW TUMBLING (SIZE 1 MINUTE)
WINDOW TUMBLING (SIZE 15 MINUTE)
GROUP BY FULL_NAME
EMIT CHANGES;
----
Expand All @@ -587,7 +590,7 @@ SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
FULL_NAME,
RATINGS_COUNT
FROM RATINGS_PER_CUSTOMER_PER_15MINUTE
WHERE ROWKEY='Rica Blaisdell'
WHERE FULL_NAME='Rica Blaisdell'
EMIT CHANGES;
----

Expand All @@ -600,7 +603,7 @@ SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS,
FULL_NAME,
RATINGS_COUNT
FROM RATINGS_PER_CUSTOMER_PER_15MINUTE
WHERE ROWKEY='Rica Blaisdell'
WHERE FULL_NAME='Rica Blaisdell'
AND WINDOWSTART > '2020-06-23T11:30:00.000';
----

Expand All @@ -621,7 +624,7 @@ PREDICATE=$(date --date '-5 min' +%s)000
# Pull from ksqlDB the aggregate-by-minute for the last five minutes for a given user:
curl -X "POST" "http://ksqldb:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{"ksql":"SELECT TIMESTAMPTOSTRING(WINDOWSTART, '\''yyyy-MM-dd HH:mm:ss'\'') AS WINDOW_START_TS, FULL_NAME, RATINGS_COUNT FROM RATINGS_PER_CUSTOMER_PER_15MINUTE WHERE ROWKEY='\''Rica Blaisdell'\'' AND WINDOWSTART > '$PREDICATE';"}'
-d '{"ksql":"SELECT TIMESTAMPTOSTRING(WINDOWSTART, '\''yyyy-MM-dd HH:mm:ss'\'') AS WINDOW_START_TS, FULL_NAME, RATINGS_COUNT FROM RATINGS_PER_CUSTOMER_PER_15MINUTE WHERE FULL_NAME='\''Rica Blaisdell'\'' AND WINDOWSTART > '$PREDICATE';"}'
----

Press Ctrl-D to exit the Docker container
Expand Down
2 changes: 1 addition & 1 deletion build-a-streaming-pipeline/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ services:
sleep infinity
ksqldb:
image: confluentinc/ksqldb-server:0.9.0
image: confluentinc/ksqldb-server:0.10.0
hostname: ksqldb
container_name: ksqldb
links:
Expand Down
2 changes: 1 addition & 1 deletion build-a-streaming-pipeline/ksqlDB.postman_collection.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
],
"body": {
"mode": "raw",
"raw": "{\"ksql\":\"SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, FULL_NAME, RATINGS_COUNT FROM RATINGS_PER_CUSTOMER_PER_15MINUTE WHERE ROWKEY='Rica Blaisdell' AND WINDOWSTART > '2020-06-23T11:42:00.000';\",\"streamsProperties\": {\"ksql.streams.auto.offset.reset\": \"earliest\"\n }}",
"raw": "{\"ksql\":\"SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, FULL_NAME, RATINGS_COUNT FROM RATINGS_PER_CUSTOMER_PER_15MINUTE WHERE FULL_NAME='Rica Blaisdell' AND WINDOWSTART > '2020-06-23T11:42:00.000';\",\"streamsProperties\": {\"ksql.streams.auto.offset.reset\": \"earliest\"\n }}",
"options": {
"raw": {
"language": "json"
Expand Down
29 changes: 16 additions & 13 deletions introduction-to-ksqldb/demo_introduction_to_ksqldb.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ docker exec -it ksqldb ksql http://localhost:8088

[source,sql]
----
CREATE STREAM MOVEMENTS (LOCATION VARCHAR)
CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', PARTITIONS=1, KAFKA_TOPIC='movements');
----

Expand All @@ -37,7 +37,7 @@ CREATE STREAM MOVEMENTS (LOCATION VARCHAR)
[source,sql]
----
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS,
ROWKEY AS PERSON,
PERSON,
LOCATION
FROM MOVEMENTS
EMIT CHANGES;
Expand Down Expand Up @@ -72,7 +72,7 @@ PRINT movements FROM BEGINNING;
----
SET 'auto.offset.reset' = 'earliest';
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS,
ROWKEY AS PERSON,
PERSON,
LOCATION
FROM MOVEMENTS
WHERE LCASE(LOCATION)='leeds'
Expand All @@ -94,6 +94,8 @@ Create a new topic that could be used to trigger an event-driven app when the us

[source,sql]
----
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM LEEDS_USERS WITH (KAFKA_TOPIC='leeds-users')
AS SELECT * FROM MOVEMENTS WHERE LCASE(LOCATION)='leeds' EMIT CHANGES;
----
Expand Down Expand Up @@ -126,13 +128,13 @@ Show the difference between stream and table - but note that it's the *same* Kaf

[source,sql]
----
CREATE TABLE MOVEMENTS_T (LOCATION VARCHAR)
CREATE TABLE MOVEMENTS_T (PERSON VARCHAR PRIMARY KEY, LOCATION VARCHAR)
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='movements');
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS, PERSON, LOCATION
FROM MOVEMENTS_T EMIT CHANGES;
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS, ROWKEY AS PERSON, LOCATION
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT_TS, PERSON, LOCATION
FROM MOVEMENTS EMIT CHANGES;
----

Expand All @@ -143,14 +145,15 @@ SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss','Europe/London') AS EVENT
SET 'auto.offset.reset' = 'earliest';
CREATE TABLE PERSON_STATS WITH (VALUE_FORMAT='AVRO') AS
SELECT ROWKEY AS PERSON,
SELECT PERSON,
LATEST_BY_OFFSET(LOCATION) AS LATEST_LOCATION,
COUNT(*) AS LOCATION_CHANGES,
COUNT_DISTINCT(LOCATION) AS UNIQUE_LOCATIONS
FROM MOVEMENTS
GROUP BY ROWKEY
GROUP BY PERSON
EMIT CHANGES;
SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
SELECT PERSON, LATEST_LOCATION, LOCATION_CHANGES, UNIQUE_LOCATIONS
FROM PERSON_STATS
EMIT CHANGES;
----
Expand All @@ -166,9 +169,9 @@ Show a pull query in action

[source,sql]
----
SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS
SELECT PERSON, LATEST_LOCATION, LOCATION_CHANGES, UNIQUE_LOCATIONS
FROM PERSON_STATS
WHERE ROWKEY='robin';
WHERE PERSON='robin';
----

Run a pull query using the REST API
Expand All @@ -177,7 +180,7 @@ Run a pull query using the REST API
----
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{"ksql":"SELECT PERSON, LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_STATS WHERE ROWKEY='\''robin'\'';"}'|jq '.[].row'
-d '{"ksql":"SELECT PERSON, LATEST_LOCATION, LOCATION_CHANGES, UNIQUE_LOCATIONS FROM PERSON_STATS WHERE PERSON='\''robin'\'';"}'|jq '.[].row'
----

== Connecting to other systems
Expand Down Expand Up @@ -239,7 +242,7 @@ INSERT INTO MOVEMENTS (ROWTIME, ROWKEY, LOCATION) VALUES (STRINGTOTIMESTAMP('202
----
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{"ksql":"CREATE STREAM MOVEMENTS (LOCATION VARCHAR) WITH (VALUE_FORMAT='\''JSON'\'', PARTITIONS=1, KAFKA_TOPIC='\''movements'\'');"}'
-d '{"ksql":"CREATE STREAM MOVEMENTS (PERSON VARCHAR KEY, LOCATION VARCHAR) WITH (VALUE_FORMAT='\''JSON'\'', PARTITIONS=1, KAFKA_TOPIC='\''movements'\'');"}'
docker exec -t ksqldb curl -s -X "POST" "http://localhost:8088/ksql" \
Expand Down
Loading

0 comments on commit 3e600b0

Please sign in to comment.