Skip to content

Commit

Permalink
added a websockets application to view the POOR_RATINGS topic
Browse files Browse the repository at this point in the history
  • Loading branch information
russau committed Jun 27, 2019
1 parent 2c0ad63 commit cb320b5
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 20 deletions.
15 changes: 15 additions & 0 deletions ksql-workshop/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,18 @@ services:
environment:
xpack.security.enabled: "false"
discovery.type: "single-node"

websockets:
image: ksql-workshop-websockets
container_name: websockets
build: ./websockets

nginx:
image: nginx:1.15-alpine
container_name: nginx
depends_on:
- websockets
ports:
- 8080:8080
volumes:
- ./nginx/conf.d:/etc/nginx/conf.d
Binary file added ksql-workshop/images/websockets.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
50 changes: 40 additions & 10 deletions ksql-workshop/ksql-workshop-windows.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ kibana /usr/local/bin/kibana-docker Up 0.0.0.0:5601->5601/t
ksql-cli /bin/sh Up
ksql-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp
mysql docker-entrypoint.sh mysqld Up 0.0.0.0:3306->3306/tcp, 33060/tcp
nginx nginx -g daemon off; Up 80/tcp, 0.0.0.0:8080->8080/tcp
schema-registry /etc/confluent/docker/run Up 8081/tcp
websockets docker-entrypoint.sh bash ... Up 3000/tcp
zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
----
Expand Down Expand Up @@ -409,6 +411,14 @@ messages-per-sec: 0.33 total-messages: 1857 last-message: 6/20/1

_N.B. you can use the up arrow on your keyboard to cycle through KSQL command history for easy access and replay of previous commands. Ctrl-R also works for searching command history._

=== Web Application for the derived stream

We now have the derived data in a Kafka topic. A team in our organization is interested in building an application to view this derived data in a web application.

A sample NodeJS websockets application has been built from the code in link:websockets[]. You can view the application at http://localhost:8080.

image::images/websockets.png[]

=== Query the stream

The derived stream that we've created is just another stream that we can interact with in KSQL as any other. If you run a `SELECT` against the stream you'll see new messages arriving based on those coming from the source `ratings` topic:
Expand Down Expand Up @@ -1091,16 +1101,36 @@ To terminate the workshop environment, run `docker-compose down`:
[source,bash]
----
$ docker-compose down
Stopping ksql-workshop_ksql-server_1 ... done
Stopping ksql-workshop_datagen-ratings_1 ... done
Stopping ksql-workshop_schema-registry_1 ... done
Stopping ksql-workshop_kafka_1 ... done
Stopping ksql-workshop_zookeeper_1 ... done
Removing ksql-workshop_ksql-server_1 ... done
Removing ksql-workshop_datagen-ratings_1 ... done
Removing ksql-workshop_schema-registry_1 ... done
Removing ksql-workshop_kafka_1 ... done
Removing ksql-workshop_zookeeper_1 ... done
Stopping control-center ... done
Stopping datagen-ratings ... done
Stopping connect-debezium ... done
Stopping kafka-connect ... done
Stopping ksql-cli ... done
Stopping schema-registry ... done
Stopping kafkacat ... done
Stopping ksql-server ... done
Stopping kibana ... done
Stopping kafka ... done
Stopping nginx ... done
Stopping elasticsearch ... done
Stopping websockets ... done
Stopping zookeeper ... done
Stopping mysql ... done
Removing control-center ... done
Removing datagen-ratings ... done
Removing connect-debezium ... done
Removing kafka-connect ... done
Removing ksql-cli ... done
Removing schema-registry ... done
Removing kafkacat ... done
Removing ksql-server ... done
Removing kibana ... done
Removing kafka ... done
Removing nginx ... done
Removing elasticsearch ... done
Removing websockets ... done
Removing zookeeper ... done
Removing mysql ... done
Removing network ksql-workshop_default
----

Expand Down
50 changes: 40 additions & 10 deletions ksql-workshop/ksql-workshop.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ kibana /usr/local/bin/kibana-docker Up 0.0.0.0:5601->5601/t
ksql-cli /bin/sh Up
ksql-server /etc/confluent/docker/run Up 0.0.0.0:8088->8088/tcp
mysql docker-entrypoint.sh mysqld Up 0.0.0.0:3306->3306/tcp, 33060/tcp
nginx nginx -g daemon off; Up 80/tcp, 0.0.0.0:8080->8080/tcp
schema-registry /etc/confluent/docker/run Up 8081/tcp
websockets docker-entrypoint.sh bash ... Up 3000/tcp
zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
----
Expand Down Expand Up @@ -404,6 +406,14 @@ messages-per-sec: 0.33 total-messages: 1857 last-message: 6/20/1

_N.B. you can use the up arrow on your keyboard to cycle through KSQL command history for easy access and replay of previous commands. Ctrl-R also works for searching command history._

=== Web Application for the derived stream

We now have the derived data in a Kafka topic. A team in our organization is interested in building an application to view this derived data in a web application.

A sample NodeJS websockets application has been built from the code in link:websockets[]. You can view the application at http://localhost:8080.

image::images/websockets.png[]

=== Query the stream

The derived stream that we've created is just another stream that we can interact with in KSQL as any other. If you run a `SELECT` against the stream you'll see new messages arriving based on those coming from the source `ratings` topic:
Expand Down Expand Up @@ -1086,16 +1096,36 @@ To terminate the workshop environment, run `docker-compose down`:
[source,bash]
----
$ docker-compose down
Stopping ksql-workshop_ksql-server_1 ... done
Stopping ksql-workshop_datagen-ratings_1 ... done
Stopping ksql-workshop_schema-registry_1 ... done
Stopping ksql-workshop_kafka_1 ... done
Stopping ksql-workshop_zookeeper_1 ... done
Removing ksql-workshop_ksql-server_1 ... done
Removing ksql-workshop_datagen-ratings_1 ... done
Removing ksql-workshop_schema-registry_1 ... done
Removing ksql-workshop_kafka_1 ... done
Removing ksql-workshop_zookeeper_1 ... done
Stopping control-center ... done
Stopping datagen-ratings ... done
Stopping connect-debezium ... done
Stopping kafka-connect ... done
Stopping ksql-cli ... done
Stopping schema-registry ... done
Stopping kafkacat ... done
Stopping ksql-server ... done
Stopping kibana ... done
Stopping kafka ... done
Stopping nginx ... done
Stopping elasticsearch ... done
Stopping websockets ... done
Stopping zookeeper ... done
Stopping mysql ... done
Removing control-center ... done
Removing datagen-ratings ... done
Removing connect-debezium ... done
Removing kafka-connect ... done
Removing ksql-cli ... done
Removing schema-registry ... done
Removing kafkacat ... done
Removing ksql-server ... done
Removing kibana ... done
Removing kafka ... done
Removing nginx ... done
Removing elasticsearch ... done
Removing websockets ... done
Removing zookeeper ... done
Removing mysql ... done
Removing network ksql-workshop_default
----

Expand Down
21 changes: 21 additions & 0 deletions ksql-workshop/nginx/conf.d/default.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
server {
listen 8080 default_server;
server_tokens off;

location / {
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;

proxy_pass http://websockets:3000;

# enable WebSockets
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}

error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/share/nginx/html;
}
}
12 changes: 12 additions & 0 deletions ksql-workshop/websockets/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
FROM node:dubnium-stretch
RUN mkdir -p /home/node/app/node_modules && \
chown -R node:node /home/node/app && \
apt-get update && \
apt-get install -y kafkacat
WORKDIR /home/node/app
COPY package*.json ./
USER node
RUN npm install
COPY --chown=node:node . .
EXPOSE 3000
CMD [ "bash", "start.sh" ]
4 changes: 4 additions & 0 deletions ksql-workshop/websockets/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Cobbled together with code samples from:

* [socket.io](https://github.com/socketio/socket.io) for websocket comms between Kafka client and web browser.
* [kafka-avro](https://github.com/waldophotos/kafka-avro) for a Kafka client with Schema Registry support.
62 changes: 62 additions & 0 deletions ksql-workshop/websockets/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// initialize kafka avro client
var KafkaAvro = require('kafka-avro');
var kafkaAvro = new KafkaAvro({
kafkaBroker: 'kafka:29092',
schemaRegistry: 'http://schema-registry:8081',
});
kafkaAvro.init()
.then(function() {
console.log('Ready to use');
});

// Setup basic express server
var express = require('express');
var app = express();
var path = require('path');
var server = require('http').createServer(app);
var io = require ('socket.io') (server);
var port = process.env.PORT || 3000;

server.listen(port, () => {
console.log('Server listening at port %d', port);
});

// Routing
app.use(express.static(path.join(__dirname, 'public')));

// log when we get a websocket connection
io.on('connection', (socket) => {
console.log('new connection, socket.id: ' + socket.id);
});

// single avro consumer
kafkaAvro.getConsumer({
'group.id': "server1",
'socket.keepalive.enable': true,
'enable.auto.commit': false,
})
.then(function(consumer) {
return new Promise(function (resolve, reject) {
consumer.on('ready', function() {
resolve(consumer);
});
consumer.connect({}, function(err) {
if (err) {
reject(err);
return;
}
resolve(consumer);
});
});
})
.then(function(consumer) {
// Subscribe to POOR_RATINGS
var topicName = 'POOR_RATINGS';
consumer.subscribe([topicName]);
consumer.consume();
consumer.on('data', function(rawData) {
// send the message to all the listeners
console.log(".");
io.sockets.emit('new message', rawData.parsed);
});
});
16 changes: 16 additions & 0 deletions ksql-workshop/websockets/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"name": "ratings-websockets",
"version": "0.0.1",
"description": "Websockets for ksql-workshop",
"main": "index.js",
"scripts": {
"start": "node index.js"
},
"author": "[email protected]",
"license": "ISC",
"dependencies": {
"express": "^4.17.1",
"kafka-avro": "^1.2.0",
"socket.io": "^2.2.0"
}
}
61 changes: 61 additions & 0 deletions ksql-workshop/websockets/public/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Poor Ratings</title>
<style>
body {
font-family: 'Roboto', sans-serif;
}

.speech-bubble {
position: relative;
background: #ddddff;
border-radius: .4em;
padding: 10px;
margin: 10px;
}

.speech-bubble:after {
content: '';
position: absolute;
left: 10px;
top: 50%;
width: 0;
height: 0;
border: 20px solid transparent;
border-right-color: #ddddff;
border-left: 0;
border-bottom: 0;
margin-top: -10px;
margin-left: -20px;
}
</style>
</head>
<body>
<h2>Poor Ratings</h2>
<div id='main'>
</div>

<script src="https://code.jquery.com/jquery-1.10.2.min.js"></script>
<script src="/socket.io/socket.io.js"></script>
<script>
$(function(){

const socket = io({
transports: ['websocket']
});

socket.on('new message', (data) => {
var bubble = $("<div>", {
'class': "speech-bubble",
});
bubble.append(data.message.string);
for (var i=0; i<data.stars.int; i++)
bubble.append('⭐️');
$('#main').append(bubble);
});
});
</script>
</body>
</html>
6 changes: 6 additions & 0 deletions ksql-workshop/websockets/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env bash
until kafkacat -b kafka:29092 -L | grep POOR_RATINGS
do
sleep 1
done
npm start

0 comments on commit cb320b5

Please sign in to comment.