Welcome to the «Building a Streaming Data Pipeline with Kafka Streams and KSQL» workshop.
To be successful in executing all exercises, you need to have following installed.
-
Docker CE Version 18.09.1
NoteIt is OK to use Windows, but the trainers are not Windows users and may not be able to help with Windows related issues. -
Git
-
JDK (min JDK 8)
-
Recommended way
-
Install sdkman
-
sdk list java
-
sdk install java <verison-provider>
e.g.sdk install java 8.0.201-zulu
-
-
-
Docker Compose Version 1.20.0 | with Docker Compose file format 2.1
-
Lesson 0: Introduction to Kafka, topics and events
-
Event Streaming platform and why do we need it?
-
Key concepts of Apache Kafka
-
Overview of our Architecture
-
-
Lab 0:
-
Create a cluster in Confluent Cloud
-
Install the Confluent Cloud CLI
-
Create topic, produce and consume events from Cloud CLI
-
-
Lesson 1: Introduction to Kafka Streams
-
Producer and Consumer APIs
-
Introduction to Kafka Streams
-
Stream-table duality
-
-
Lab 1:
-
Code review: Streaming Moview ratings
-
run app
-
run data generator
-
-
Lesson 2: More Kafka Streams
-
Introduction to Kafka Streams
-
Introducing local materialized views
-
Stream-table duality
-
Code review of web-service
-
-
Lesson 3 KSQL
-
concepts **
-
-
Lab 3 Moview Ratings with KSQL
-
hands on KSQL CLI
-
-
It’s a wrap!
Note
|
If you will follow steps below you should checkout only directory that has source code relevant to this post. |
-
Get example from GitHub
mkdir ~/temp/demo-scene cd ~/temp/demo-scene git init . git remote add origin -f https://github.com/confluentinc/demo-scene/ git config core.sparsecheckout true echo "streams-movie-demo/*" >> .git/info/sparse-checkout git pull --depth=2 origin master cd streams-movie-demo ls -lh
and you should see something like this
In this section, you are going to create your own Apache Kafka cluster on Confluent Cloud. This cluster will be used troughout the entire workshop — therefore make sure to follow the steps shown here.
Open the following URL in your browser: https://confluent.cloud/login
Enter with the following credentials:
Username: [email protected]
Password: *************
The instructors will provide the password for this account.
From the main UI, click on the Add Cluster
button.
Enter with a value for the Cluster name
field.
To be able to identify your own cluster among the others, use the following notation for the name: firstName + '-' + lastName.
Make sure to select Google Cloud Platform
as your Cloud provider.
Though any cloud provider would work, we are using GCP in this workshop because it is cheaper.
Finally, make sure to select us-central-1
[EU West (London)] as region, since it is the closest location to Stockholm/Sweden.
The image below shows an example of how your cluster should look like.
When you finish done the changes, click in the Continue
button.
The UI will ask for credit card details.
Ask for some of the instructors to enter a valid credit card.
Finally, click on the Save and launch cluster
button.
From the main UI, click on your cluster.
Then, click on the Data In/Out
menu and subsequentially on the CLI
sub-menu.
You will be presented with the following instructions:
Follow the instructions presented thoroughly. This is crucial for the next sections of the workshop to work. If you feel that you need help, please don’t hesitate to call any of the instructors.
ccloud topic create raw-movies --partitions 16 --replication-factor 3
ccloud topic create raw-ratings --partitions 16 --replication-factor 3
Check if the topics were created properly by using the following command:
ccloud topic list
You should see the two topics being listed in the output.
Open a terminal to consume records from the raw-movies
topic using the following command:
ccloud consume -t raw-movies
Open another terminal to produce records to the raw-movies
topic using the following command:
cat ../data/movies.dat | ccloud produce -t raw-movies
Open another terminal to consume records from the raw-ratings
topic using the following command:
ccloud consume -t raw-ratings
Open another terminal to produce records to the raw-ratings
topic using the following command:
cat ../data/ratings.dat | ccloud produce -t raw-ratings
Press Ctrl+C to interrupt the consume commands issue in this section.
Note
|
Use this in a non-production Confluent Cloud instance for development purposes only. |
On the host from which you are running Docker, ensure that you have correctly initialized Confluent Cloud CLI and have a valid configuration file at $HOME/.ccloud/config
. More information here.
Make sure you completed the steps in the Setup section above before proceeding. You may bring up all services in the Docker Compose file at once…
docker-compose up -d control-center
Control Center may take from one to five minutes until the service finish start up, depending on your hardware configuration.
To check if things are working properly, open the following URL in a browser: http://localhost:9021.
If the Control Center UI pop’s up, click on the Topics
menu.
You should be able to see the topics created previously.
If you need to troubleshoot what is going on with a particular service, you can use the command docker-compose logs -f <SERVICE>
.
docker-compose logs -f control-center
The Streams API of Apache Kafka, available through a Java library, can be used to build highly scalable, elastic, fault-tolerant, distributed applications and microservices. First and foremost, the Kafka Streams API allows you to create real-time applications that power your core business. It is the easiest yet the most powerful technology to process data stored in Kafka. It builds upon important concepts for stream processing such as efficient management of application state, fast and efficient aggregations and joins, properly distinguishing between event-time and processing-time, and seamless handling of late-arriving and out-of-order data.
-
Start Kafka Streams application
./gradlew streams:runApp -PconfigPath=$HOME/.ccloud/config
Confluent KSQL is the streaming SQL engine that enables real-time data processing against Apache Kafka®. It provides an easy-to-use, yet powerful interactive SQL interface for stream processing on Kafka, without the need to write code in a programming language such as Java or Python. KSQL is scalable, elastic, fault-tolerant, and it supports a wide range of streaming operations, including data filtering, transformations, aggregations, joins, windowing, and sessionization.
In this part of the workshop, you are going to enrich the implementation you have made thus far with KSQL, which is going to provide similar stream processing capabilities found previously using Kafka Streams.
In this step, you are going to seed the topics movies
and ratings
with at least one record.
This is necessary because once we start creating streams and tables around those topics, KSQL knows how to parse the data.
head -n1 ../data/ratings-json.js | ccloud produce -t ratings
head -n1 ../data/movies-json.js | ccloud produce -t movies
In this step, you are going to create a session using KSQL CLI. KSQL CLI is a command-line interface that allows you to send KSQL sentences to the KSQL Server, whereas these senteces are to create things on the server or simply to execute ad-hoc queries interactively.
docker run --network workshop_default --rm --interactive --tty confluentinc/cp-ksql-cli:5.1.0 http://ksql-server:8088
You should be presented with a prompt as shown below:
===========================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Apache Kafka® =
===========================================
Copyright 2017-2018 Confluent Inc.
CLI v5.1.0, Server v5.1.0 located at http://ksql-server:8088
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
To test things out, execute the following command:
PRINT 'movies' FROM BEGINNING;
You should see all the records that you loaded into the topic movies
previously.
Press Ctrl+C to interrupt the print command and go back to the KSQL CLI prompt.
Using the KSQL CLI you can also read and write properties. Properties are server and/or client attributes that defines the basic behavior of queries. For instance, we are going to need to read the topics always from the beginning, so we can read old records and not only the newest ones. In order to do this, execute the following command:
SET 'auto.offset.reset' = 'earliest';
To show the value of properties, you can use the SHOW PROPERTIES
command:
SHOW PROPERTIES;
In this step, you are going to create your first table. Keep always in mind though that tables are nothing but snapshots of a stream for the last state of a given key. That means that the building block that actually stores data are streams — but we can capture a subset of that stream in a given time and materialize it as a table. Sounds complicated right? It will be less complicated when you start executing the instructions below.
First, you need to create a stream around the existing topic. Create the stream using the following command:
CREATE STREAM MOVIES_SRC \
(movie_id BIGINT, title VARCHAR, release_year INT, country VARCHAR, rating DOUBLE, \
cinematographer VARCHAR, genres ARRAY<VARCHAR>, directors ARRAY<VARCHAR>, composers ARRAY<varchar>, \
screenwriters ARRAY<VARCHAR>, production_companies ARRAY<VARCHAR>)\
WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='movies');
There you have it. You just created your first stream. Streams are entities that are kept by KSQL and contain metadata that defines it’s caracteristics. In order to check the stream metadata, use the following command:
DESCRIBE MOVIES_SRC;
You should see the following output:
ksql> DESCRIBE MOVIES_SRC;
Name : MOVIES_SRC
Field | Type
--------------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
MOVIE_ID | BIGINT
TITLE | VARCHAR(STRING)
RELEASE_YEAR | INTEGER
COUNTRY | VARCHAR(STRING)
RATING | DOUBLE
CINEMATOGRAPHER | VARCHAR(STRING)
GENRES | ARRAY<VARCHAR(STRING)>
DIRECTORS | ARRAY<VARCHAR(STRING)>
COMPOSERS | ARRAY<VARCHAR(STRING)>
SCREENWRITERS | ARRAY<VARCHAR(STRING)>
PRODUCTION_COMPANIES | ARRAY<VARCHAR(STRING)>
--------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;
Next, we need to make sure that each record from this stream is identifiable (or partition by, as we prefer) using a field that is unique. Think in this as if it was a primary key for a SQL database if that makes you feel more confortable. In order to do this, we need to rekey this stream using the following command:
CREATE STREAM MOVIES_REKEYED \
WITH (PARTITIONS=1) \
AS SELECT * FROM movies_src PARTITION BY movie_id;
The result of this command is the creation of a new stream. This new stream will contain exactly the same records that the previous stream contained, as well as any new record that arrives on it since KSQL ensures that derived streams are kept in-sync all the time. And the best of all… you don’t need to write any additional code for that to happen. If you want to play with this, execute the following query:
SELECT * FROM MOVIES_REKEYED;
You should see the record that you previously loaded into the movies
topic, which proves the magic of KSQL which is keeping your ETL pipeline live, with all the data copying between the pipes managed automatically.
Now, you may notice that the query you just execute never finishes.
For now, you can safely press Ctrl+C to interrupt the query command and go back to the KSQL CLI prompt.
But it is important to understand that queries in KSQL are executed forever.
The reason being is because streams are set of records that are continuously written and therefore, the result of a streams is always updated.
Hence why a query in KSQL never finishes.
Now that you have a stream with each of its records partitioned by the movie_id
field, we can finally create our table.
Execute the following command:
CREATE TABLE MOVIES_REF (movie_id BIGINT, title VARCHAR, release_year INT, country VARCHAR, rating DOUBLE, cinematographer VARCHAR, genres ARRAY<VARCHAR>, directors ARRAY<VARCHAR>, composers ARRAY<varchar>, screenwriters ARRAY<VARCHAR>, production_companies ARRAY<VARCHAR>) WITH (VALUE_FORMAT='JSON', KAFKA_TOPIC='MOVIES_REKEYED', KEY='movie_id');
Once the table is created, it can be queried as many times as you want. Execute the following query over the newly created table:
SELECT * FROM MOVIES_REF;
Let this query execute for now and don’t interrupt it by pressing Ctrl+C. Open a new terminal and execute the following:
cat ../data/movies-json.js | ccloud produce -t movies
If you look to the KSQL query being executed, you should see tons of records being shown.
1549232500202 | 1 | 1 | Once Upon a Time in the West | 1968 | Italy | 8.199999809265137 | Tonino Delli Colli | [Western] | [Sergio Leone] | [Ennio Morricone] | [Sergio Leone, Sergio Donati, Dario Argento, Bernardo Bertolucci] | [Paramount Pictures]
1549236467679 | 11 | 11 | The Ugly Truth | 2009 | United States | 5.699999809265137 | Russell Carpenter | [Romance, Comedy] | [Robert Luketic] | [Aaron Zigman] | [Nicole Eastman, Karen McCullah Lutz, Kirsten Smith] | [Sony Pictures, Lakeshore Entertainment]
1549236467679 | 12 | 12 | Warlock | 1959 | United States | 7.0 | Joseph MacDonald | [Western] | [Edward Dmytryk] | [Leigh Harline] | [Robert Alan Aurthur] | [20th Century Fox]
1549236467680 | 27 | 27 | Spanish Movie | 2009 | Spain | 3.799999952316284 | Óscar Faura | [Comedy] | [Javier Ruiz Caldera] | [Fernando Velázquez] | [Paco Cabezas] | [Telecinco Cinema, Think Studio]
1549236467680 | 28 | 28 | The Little Mermaid | 1989 | United States | 6.699999809265137 | Animation | [Animation, Fantasy, Musical, Kids] | [John Musker, Ron Clements] | [Alan Menken] | [John Musker, Ron Clements] | [Walt Disney Pictures]
1549236467681 | 43 | 43 | Jurassic Park | 1993 | United States | 7.0 | Dean Cundey | [Sci-Fi, Adventure] | [Steven Spielberg] | [John Williams] | [Michael Crichton, David Koepp] | [Universal Pictures, Amblin Entertainment]
1549236467682 | 59 | 59 | Casablanca | 1942 | United States | 8.399999618530273 | Arthur Edeson | [Drama, Romance] | [Michael Curtiz] | [Max Steiner] | [Julius J. Epstein, Philip G. Epstein, Howard Koch] | [Warner Bros.
Pictures; Producer: Hal B. Wallis]
1549236467683 | 75 | 75 | The Italian Job | 2003 | United States | 6.5 | Wally Pfister | [Action] | [F. Gary Gray] | [John Powell] | [Wayne Powers, Donna Powers] | [Paramount Pictures]
1549236467681 | 44 | 44 | 310 to Yuma (Three Ten to Yuma) | 1957 | United States | 7.5 | Charles Lawton Jr. | [Western] | [Delmer Daves] | [George Duning] | [Halsted Welles] | [Columbia Pictures]
1549236467684 | 91 | 91 | Iron Man | 2008 | United States | 6.5 | Matthew Libatique | [Fantasy, Action, Adventure, Sci-Fi] | [Jon Favreau] | [Ramin Djawadi] | [Arthur Marcum, Matt Holloway, Mark Fergus, Hawk Ostby] | [Paramount Pictures, Marvel Enterprises, Goldman & Associates]
Not only this proves that you are in the right track with this workshop but it also proves how KSQL is awesome. Think for a second what just happened:
You just built in a matter of minutes a fairly complicated ETL pipeline in which data is being transferred from a input topic to a series of pipes that are changing the nature of the data (re-keying in this case) and finally creating a table where data is always up-to-date with whatever is written to the input topic.