In this project, you will construct a streaming event pipeline around Apache Kafka and its ecosystem. Using public data from the Chicago Transit Authority we will construct an event pipeline around Kafka that allows us to simulate and display the status of train lines in real time.
When the project is complete, you will be able to monitor a website to watch trains move from station to station.
The following are required to complete this project:
- Docker
- Python 3.7
- Access to a computer with a minimum of 16gb+ RAM and a 4-core CPU to execute the simulation
The Chicago Transit Authority (CTA) has asked us to develop a dashboard displaying system status for its commuters. We have decided to use Kafka and ecosystem tools like REST Proxy and Kafka Connect to accomplish this task.
Our architecture will look like so:
The first step in our plan is to configure the train stations to emit some of the events that we need. The CTA has placed a sensor on each side of every train station that can be programmed to take an action whenever a train arrives at the station.
To accomplish this, you must complete the following tasks:
- Complete the code in
producers/models/producer.py
- Define a
value
schema for the arrival event inproducers/models/schemas/arrival_value.json
with the following attributesstation_id
train_id
direction
line
train_status
prev_station_id
prev_direction
- Complete the code in
producers/models/station.py
so that:- A topic is created for each station in Kafka to track the arrival events
- The station emits an
arrival
event to Kafka whenever theStation.run()
function is called. - Ensure that events emitted to kafka are paired with the Avro
key
andvalue
schemas
- Define a
value
schema for the turnstile event inproducers/models/schemas/turnstile_value.json
with the following attributesstation_id
station_name
line
- Complete the code in
producers/models/turnstile.py
so that:- A topic is created for each turnstile for each station in Kafka to track the turnstile events
- The station emits a
turnstile
event to Kafka whenever theTurnstile.run()
function is called. - Ensure that events emitted to kafka are paired with the Avro
key
andvalue
schemas
Our partners at the CTA have asked that we also send weather readings into Kafka from their weather hardware. Unfortunately, this hardware is old and we cannot use the Python Client Library due to hardware restrictions. Instead, we are going to use HTTP REST to send the data to Kafka from the hardware using Kafka's REST Proxy.
To accomplish this, you must complete the following tasks:
- Define a
value
schema for the weather event inproducers/models/schemas/weather_value.json
with the following attributestemperature
status
- Complete the code in
producers/models/weather.py
so that:- A topic is created for weather events
- The weather model emits
weather
event to Kafka REST Proxy whenever theWeather.run()
function is called.- NOTE: When sending HTTP requests to Kafka REST Proxy, be careful to include the correct
Content-Type
. Pay close attention to the examples in the documentation for more information.
- NOTE: When sending HTTP requests to Kafka REST Proxy, be careful to include the correct
- Ensure that events emitted to REST Proxy are paired with the Avro
key
andvalue
schemas
Finally, we need to extract station information from our PostgreSQL database into Kafka. We've decided to use the Kafka JDBC Source Connector.
To accomplish this, you must complete the following tasks:
- Complete the code and configuration in
producers/connectors.py
- Please refer to the Kafka Connect JDBC Source Connector Configuration Options for documentation on the options you must complete.
- You can run this file directly to test your connector, rather than running the entire simulation.
- Make sure to use the Landoop Kafka Connect UI and Landoop Kafka Topics UI to check the status and output of the Connector.
- To delete a misconfigured connector:
CURL -X DELETE localhost:8083/connectors/stations
We will leverage Faust Stream Processing to transform the raw Stations table that we ingested from Kafka Connect. The raw format from the database has more data than we need, and the line color information is not conveniently configured. To remediate this, we're going to ingest data from our Kafka Connect topic, and transform the data.
To accomplish this, you must complete the following tasks:
- Complete the code and configuration in `consumers/faust_stream.py
You must run this Faust processing application with the following command:
faust -A faust_stream worker -l info
Next, we will use KSQL to aggregate turnstile data for each of our stations. Recall that when we produced turnstile data, we simply emitted an event, not a count. What would make this data more useful would be to summarize it by station so that downstream applications always have an up-to-date count
To accomplish this, you must complete the following tasks:
- Complete the queries in
consumers/ksql.py
- The KSQL CLI is the best place to build your queries. Try
ksql
in your workspace to enter the CLI. - You can run this file on its own simply by running
python ksql.py
- Made a mistake in table creation?
DROP TABLE <your_table>
. If the CLI asks you to terminate a running query, you canTERMINATE <query_name>
With all of the data in Kafka, our final task is to consume the data in the web server that is going to serve the transit status pages to our commuters.
To accomplish this, you must complete the following tasks:
- Complete the code in
consumers/consumer.py
- Complete the code in
consumers/models/line.py
- Complete the code in
consumers/models/weather.py
- Complete the code in
consumers/models/station.py
In addition to the course content you have already reviewed, you may find the following examples and documentation helpful in completing this assignment:
- Confluent Python Client Documentation
- Confluent Python Client Usage and Examples
- REST Proxy API Reference
- Kafka Connect JDBC Source Connector Configuration Options
The project consists of two main directories, producers
and consumers
.
The following directory layout indicates the files that the student is responsible for modifying by adding a *
indicator. Instructions for what is required are present as comments in each file.
* - Indicates that the student must complete the code in this file
├── consumers
│ ├── consumer.py *
│ ├── faust_stream.py *
│ ├── ksql.py *
│ ├── models
│ │ ├── lines.py
│ │ ├── line.py *
│ │ ├── station.py *
│ │ └── weather.py *
│ ├── requirements.txt
│ ├── server.py
│ ├── topic_check.py
│ └── templates
│ └── status.html
└── producers
├── connector.py *
├── models
│ ├── line.py
│ ├── producer.py *
│ ├── schemas
│ │ ├── arrival_key.json
│ │ ├── arrival_value.json *
│ │ ├── turnstile_key.json
│ │ ├── turnstile_value.json *
│ │ ├── weather_key.json
│ │ └── weather_value.json *
│ ├── station.py *
│ ├── train.py
│ ├── turnstile.py *
│ ├── turnstile_hardware.py
│ └── weather.py *
├── requirements.txt
└── simulation.py
To run the simulation, you must first start up the Kafka ecosystem on their machine utilizing Docker Compose.
%> docker-compose up
Docker compose will take a 3-5 minutes to start, depending on your hardware. Please be patient and wait for the docker-compose logs to slow down or stop before beginning the simulation.
Once docker-compose is ready, the following services will be available:
Service | Host URL | Docker URL | Username | Password |
---|---|---|---|---|
Public Transit Status | http://localhost:8888 | n/a | ||
Landoop Kafka Connect UI | http://localhost:8084 | http://connect-ui:8084 | ||
Landoop Kafka Topics UI | http://localhost:8085 | http://topics-ui:8085 | ||
Landoop Schema Registry UI | http://localhost:8086 | http://schema-registry-ui:8086 | ||
Kafka | PLAINTEXT://localhost:9092,PLAINTEXT://localhost:9093,PLAINTEXT://localhost:9094 | PLAINTEXT://kafka0:9092,PLAINTEXT://kafka1:9093,PLAINTEXT://kafka2:9094 | ||
REST Proxy | http://localhost:8082 | http://rest-proxy:8082/ | ||
Schema Registry | http://localhost:8081 | http://schema-registry:8081/ | ||
Kafka Connect | http://localhost:8083 | http://kafka-connect:8083 | ||
KSQL | http://localhost:8088 | http://ksql:8088 | ||
PostgreSQL | jdbc:postgresql://localhost:5432/cta |
jdbc:postgresql://postgres:5432/cta |
cta_admin |
chicago |
Note that to access these services from your own machine, you will always use the Host URL
column.
When configuring services that run within Docker Compose, like Kafka Connect you must use the Docker URL. When you configure the JDBC Source Kafka Connector, for example, you will want to use the value from the Docker URL
column.
There are two pieces to the simulation, the producer
and consumer
. As you develop each piece of the code, it is recommended that you only run one piece of the project at a time.
However, when you are ready to verify the end-to-end system prior to submission, it is critical that you open a terminal window for each piece and run them at the same time. If you do not run both the producer and consumer at the same time you will not be able to successfully complete the project.
cd producers
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python simulation.py
Once the simulation is running, you may hit Ctrl+C
at any time to exit.
cd consumers
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
faust -A faust_stream worker -l info
cd consumers
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python ksql.py
** NOTE **: Do not run the consumer until you have reached Step 6!
cd consumers
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python server.py
Once the server is running, you may hit Ctrl+C
at any time to exit.
These instructions assume that you have already installed Docker and Docker Compose on your computer. If Docker and Docker Compose are not installed, please refer to these links for installation -
Windows Users - You must use Windows Subsystem for Linux (WSL) 2. This guide is written against the Ubuntu 20.04 distribution. These instructions should work for any Linux installation on Windows. However, other Linux distributions have not been tested and are not supported. For best results, install and use Ubuntu 20.04.
Note - Windows Users must execute the commands inside of the Ubuntu 20.04 environment. Running the following commands from Powershell or the Windows command prompt will not work.
This project makes use of docker-compose for running your project’s dependencies:
- Kafka
- Zookeeper
- Schema Registry
- REST Proxy
- Kafka Connect
- KSQL
- Kafka Connect UI
- Kafka Topics UI
- Schema Registry UI
- Postgres
The docker-compose file does not run your code.
To start docker-compose, navigate to the starter directory containing docker-compose.yaml
and run the following commands:
Note - Windows Users must execute this command inside of the Ubuntu 20.04 environment
$> cd starter
$> docker-compose up
Starting zookeeper ... done
Starting kafka0 ... done
Starting schema-registry ... done
Starting rest-proxy ... done
Starting connect ... done
Starting ksql ... done
Starting connect-ui ... done
Starting topics-ui ... done
Starting schema-registry-ui ... done
Starting postgres ... done
You will see a large amount of text print out in your terminal and continue to scroll. This is normal! This means your dependencies are up and running.
To check the status of your environment, you may run the following command at any time from a separate terminal instance:
$> docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------
starter_connect-ui_1 /run.sh Up 8000/tcp, 0.0.0.0:8084->8084/tcp
starter_connect_1 /etc/confluent/docker/run Up 0.0.0.0:8083->8083/tcp, 9092/tcp
starter_kafka0_1 /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
starter_ksql_1 /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp
starter_postgres_1 docker-entrypoint.sh postgres Up 0.0.0.0:5432->5432/tcp
starter_rest-proxy_1 /etc/confluent/docker/run Up 0.0.0.0:8082->8082/tcp
starter_schema-registry-ui_1 /run.sh Up 8000/tcp, 0.0.0.0:8086->8086/tcp
starter_schema-registry_1 /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
starter_topics-ui_1 /run.sh Up 8000/tcp, 0.0.0.0:8085->8085/tcp
starter_zookeeper_1 /etc/confluent/docker/run Up 0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp
Now that your project’s dependencies are running in Docker Compose, we’re ready to get our project up and running.
Windows Users Only: You must first install
librdkafka-dev
in your WSL Linux. Run the following command in your Ubuntu terminal:sudo apt-get install librdkafka-dev -y
Now you should be able to follow all the instructions on the Project Directions page and complete your project.
When you are ready to stop Docker Compose you can run the following command:
$> docker-compose stop
Stopping starter_postgres_1 ... done
Stopping starter_schema-registry-ui_1 ... done
Stopping starter_topics-ui_1 ... done
Stopping starter_connect-ui_1 ... done
Stopping starter_ksql_1 ... done
Stopping starter_connect_1 ... done
Stopping starter_rest-proxy_1 ... done
Stopping starter_schema-registry_1 ... done
Stopping starter_kafka0_1 ... done
Stopping starter_zookeeper_1 ... done
If you would like to clean up the containers to reclaim disk space, as well as the volumes containing your data:
$> docker-compose rm -v
Going to remove starter_postgres_1, starter_schema-registry-ui_1, starter_topics-ui_1, starter_connect-ui_1, starter_ksql_1, starter_connect_1, starter_rest-proxy_1, starter_schema-registry_1, starter_kafka0_1, starter_zookeeper_1
Are you sure? [yN] y
Removing starter_postgres_1 ... done
Removing starter_schema-registry-ui_1 ... done
Removing starter_topics-ui_1 ... done
Removing starter_connect-ui_1 ... done
Removing starter_ksql_1 ... done
Removing starter_connect_1 ... done
Removing starter_rest-proxy_1 ... done
Removing starter_schema-registry_1 ... done
Removing starter_kafka0_1 ... done
Removing starter_zookeeper_1 ... done
Written with StackEdit.