Follow along this tutorial-style demo to learn how to set up Confluent Cloud and analyze data using ksqlDB. We'll use AIS, which is an automatic tracking system used by ships, and pull live public data from it on ships' speed, location, and other details. We'll then feed that data in to an Apache Kafka® topic via a connection to Confluent Cloud. Afterwards, we'll build streams using ksqlDB and run analyses on that data.
Note: this tutorial is based on the Confluent blog post by Robin Moffat. It provides guided directions on how to get the same results as the author.
1 - Sign up for a Confluent Cloud account.
2 - Make sure you have jq
(which is a utility that formats JSON results), gpsd
(which formats AIS data) and kcat
(which lets you see what's going on in a Kafka cluster) by running these commands in your terminal:
jq --version
gpsd --version
kcat -V
2a - If you need to install them:
View the instructions for installing jq
depending on your operating system.
For gpsd
, brew install gpsd
for MacOS,
or
apt-get install gpsd-clients
for Linux.
Unfortunately, installation is not recommended for WSL2 and Linux distribution is recommended.
Install kcat using the instructions from the repo README.
Sign in to your Confluent Cloud account. Head over to the confluent.cloud/environments page and click 'Add Cloud Environment' on the top right of your screen.
Name your environment 'ais_data' and click 'Create'. Note: If you're prompted to select a Stream Governance package, just click the 'I'll do it later' link at the bottom of the page.
On your cluster page, click 'Create cluster on my own' or 'Create cluster'.
Select the basic configuration.
Then, select your prodiver and region. Next, name your cluster 'ais_data', and click 'Launch cluster'. You'll be re-directed to your cluster dashboard.
Use the left-hand navbar to navigate to the API key page, create an API key (give it global scope) and download the values for later.
Now, navigate to 'Topics' in the left-hand navbar, and create a topic named 'ais' using the default values.
That's all for now. We'll revisit the Confluent Cloud environment in a minute so keep that tab open!
To connect to the website and view formatted JSON results, run this command in your terminal:
nc 153.44.253.27 5631|gpsdecode |jq --unbuffered '.'
Note: you can ctrl+C or click the terminal to pause the flow of data.
You'll see results similar to:
{
"class": "AIS",
"device": "stdin",
"type": 1,
"repeat": 0,
"mmsi": 257017920,
"scaled": true,
"status": 0,
"status_text": "Under way using engine",
"turn": 0,
"speed": 0,
"accuracy": false,
"lon": 6.080573,
"lat": 61.863708,
"course": 300.2,
"heading": 115,
"second": 28,
"maneuver": 0,
"raim": false,
"radio": 81923
}
These results might seem a little magical if you're not familiar with nc
and jq
, so let's break it down.
nc
is a netcat command that reads and writes data across network connections. Here, we're connecting to the ais websocket located at IP 153.44.253.27
and port 5631
.
The gpsdecode
command after the first pipe does what it sounds like: decodes the gps data.
Lastly, passing the --unbuffered
flag to jq
flushes the output after each JSON object is printed. As far as the '.'
goes, you can create objects and arrays using jq
syntax (see examples) and '.'
is a way of saying "Put this all in a top-level JSON object, if you please jq".
Now, run this command to pipe in the data from the source to your topic in Confluent Cloud using kcat. Where the command says YOUR_API_KEY_HERE
and YOUR_API_SECRET_HERE
, replace those values with the api key and secret you downloaded earlier.
To find the YOUR_BOOTSTRAP_SERVER_HERE
value, click on the 'Cluster Settings' tab on the left-hand navbar, and look under 'Endpoints'. You'll see your value there. It's also in the credentials file you downloaded earlier.
nc 153.44.253.27 5631 |
gpsdecode |
kcat -X security.protocol=SASL_SSL -X sasl.mechanism=PLAIN -b YOUR_BOOTSTRAP_SERVER_HERE -X sasl.username=YOUR_API_KEY_HERE -X sasl.password=YOUR_API_SECRET_HERE -t ais -P
Navigate to your ais
topic in the Confluent Cloud interface using the left-hand navbar, click the 'messages' tab and view the messages coming in!
In this part of the tutorial, we'll work with ksqlDB to transform the data coming in to the ais
topic.
First, you'll need to provision a ksqlDB cluster. Select 'ksqlDB' in the left-hand navbar, and click either 'Add cluster' or 'create cluster myself'. Select 'global access', and on the next page, accept the default cluster name and cluster size. Then, launch your cluster!
We'll create a stream based on the ais
topic. Open the 'editor' tab.
Note: Double-check to make sure your
auto.offset.reset
tab is set to 'earliest.
Now, copy/paste this query in:
create stream ais (class VARCHAR, device VARCHAR, type INT, repeat INT, mmsi INT, scaled BOOLEAN, status INT, status_text VARCHAR, turn INT, speed INT, accuracy BOOLEAN, lon INT, lat INT, course INT, heading INT, second INT, maneuver INT, raim BOOLEAN, radio INT) with (value_format = 'JSON', kafka_topic = 'ais');
This query creates the columns of data in a stream based on what comes in from the AIS port, using a JSON schema.
Once you've run the query, check your 'Streams' tab to make sure you can see the stream listed. Click on it. You'll be able to see your messages coming through!
Now, say you're building a frontend application and you don't need every piece of data listed in the stream. You could filter in the application client-side logic, but it'd be cleaner to create a new topic for your purposes. Let's do it!
First, let's create a stream based on the columns we need. You'll need to pick an MMSI that seems frequent based on your view of the data:
CREATE STREAM AIS_FILTERED AS
SELECT
MMSI,
LAT,
LON
FROM AIS
WHERE (MMSI = YOUR_SELECTED_MMSI)
EMIT CHANGES;
Now, if we then query that stream, we can see the filtered data coming in!