Skip to content

Commit

Permalink
Update demo for 5.4 beta / #crunchconf
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Oct 17, 2019
1 parent a40f240 commit a77e4e1
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 253 deletions.
1 change: 1 addition & 0 deletions kafka-connect-zero-to-hero/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
kafka-connect-zero-to-hero/data/jars
1 change: 1 addition & 0 deletions kafka-connect-zero-to-hero/data/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
data/jars
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ Trigger the MySQL data generator with:

[source,bash]
----
docker-compose exec mysql /data/02_populate_more_orders.sh
docker exec mysql /data/02_populate_more_orders.sh
----

Look at the new rows!

[source,bash]
----
watch -n 2 -x docker-compose exec mysql bash -c 'echo "SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
watch -n 1 -x docker exec -t mysql bash -c 'echo "SELECT * FROM ORDERS ORDER BY CREATE_TS DESC LIMIT 1 \G" | mysql -u root -p$MYSQL_ROOT_PASSWORD demo'
----

==== Create the connector
Expand All @@ -105,7 +105,7 @@ curl -i -X PUT -H "Content-Type:application/json" \
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
Expand All @@ -122,14 +122,6 @@ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
column -s : -t| sed 's/\"//g'| sort
----

[source,bash]
----
# Apache Kafka <=2.2
curl -s "http://localhost:8083/connectors?expand=info&expand=status"|jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
----

Output should be

[source,bash]
Expand All @@ -141,7 +133,17 @@ http://localhost:9021/[View the topic in Confluent Control Center] or from the C

[source,bash]
----
docker-compose exec -T kafka-connect-01 \
docker run --net host --rm edenhill/kafkacat:1.5.0 \
-b localhost:9092 \
-r http://localhost:8081\
-s avro \
-t mysql-debezium-asgard.demo.ORDERS \
-C -o -10 -q | jq '.'
----

[source,bash]
----
docker exec -t kafka-connect-01 \
kafka-avro-console-consumer \
--bootstrap-server kafka:29092 \
--property schema.registry.url=http://schema-registry:8081 \
Expand All @@ -150,7 +152,6 @@ docker-compose exec -T kafka-connect-01 \

Show Kafka Consumer and MySQL side by side.


=== Stream data from Kafka to Elasticsearch

[source,bash]
Expand Down Expand Up @@ -201,9 +202,9 @@ http://localhost:5601/app/kibana#/discover?_g=(refreshInterval:(pause:!f,value:5
[source,bash]
----
curl -s http://localhost:9200/mysql-debezium-asgard.demo.orders/_search \
-H 'content-type: application/json' \
-d '{ "size": 1, "sort": [ { "CREATE_TS": { "order": "desc" } } ] }' |\
jq '.'
-H 'content-type: application/json' \
-d '{ "size": 1, "sort": [ { "CREATE_TS": { "order": "desc" } } ] }' |\
jq '.hits.hits[]._source'
----

[NOTE]
Expand All @@ -222,5 +223,5 @@ If you want to set the Elasticsearch document id to match the key of the source

=== View in Neo4j

View in http://localhost:7474/browser/[Neo4j browser]
View in http://localhost:7474/browser/[Neo4j browser] (login `neo4j`/`connect`)

254 changes: 127 additions & 127 deletions kafka-connect-zero-to-hero/docker-compose-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.0
image: confluentinc/cp-zookeeper:5.4.0-beta1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-enterprise-kafka:5.3.0
image: confluentinc/cp-enterprise-kafka:5.4.0-beta1
container_name: kafka
depends_on:
- zookeeper
Expand Down Expand Up @@ -49,7 +49,7 @@ services:
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

schema-registry:
image: confluentinc/cp-schema-registry:5.3.0
image: confluentinc/cp-schema-registry:5.4.0-beta1
container_name: schema-registry
ports:
- "8081:8081"
Expand All @@ -61,7 +61,7 @@ services:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181

kafka-connect-01:
image: confluentinc/cp-kafka-connect:5.3.0
image: confluentinc/cp-kafka-connect:5.4.0-beta1
container_name: kafka-connect-01
depends_on:
- zookeeper
Expand Down Expand Up @@ -92,137 +92,137 @@ services:
# Interceptor config
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.0-SNAPSHOT.jar
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.4.0-beta1-SNAPSHOT.jar
volumes:
- /Users/Robin/cp/kafka-connect-zero-to-hero__jars:/usr/share/confluent-hub-components/local
- ${PWD}/data/jars:/usr/share/confluent-hub-components/local
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'

kafka-connect-02:
image: confluentinc/cp-kafka-connect:5.3.0
container_name: kafka-connect-02
depends_on:
- zookeeper
- kafka
- schema-registry
ports:
- 18083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-02"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
# Interceptor config
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.0-SNAPSHOT.jar
volumes:
- /Users/Robin/cp/kafka-connect-zero-to-hero__jars:/usr/share/confluent-hub-components/local
# kafka-connect-02:
# image: confluentinc/cp-kafka-connect:5.4.0-beta1
# container_name: kafka-connect-02
# depends_on:
# - zookeeper
# - kafka
# - schema-registry
# ports:
# - 18083:8083
# environment:
# CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
# CONNECT_REST_PORT: 8083
# CONNECT_GROUP_ID: kafka-connect
# CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
# CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
# CONNECT_STATUS_STORAGE_TOPIC: _connect-status
# CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
# CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
# CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
# CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
# CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
# CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
# CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect-02"
# CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
# CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
# CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
# CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
# CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
# CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
# # Interceptor config
# CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
# CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
# CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.4.0-beta1-SNAPSHOT.jar
# volumes:
# - /Users/Robin/cp/kafka-connect-zero-to-hero__jars:/usr/share/confluent-hub-components/local

ksql-server:
image: confluentinc/cp-ksql-server:5.3.0
container_name: ksql-server
ports:
- 8088:8088
depends_on:
- kafka
environment:
KSQL_BOOTSTRAP_SERVERS: kafka:29092
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_CUB_KAFKA_TIMEOUT: 300
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
# Useful settings for development/laptop use - modify as needed for Prod
KSQL_KSQL_COMMIT_INTERVAL_MS: 2000
KSQL_KSQL_SINK_PARTITIONS: 1
KSQL_KSQL_CACHE_MAX_BYTES_BUFFERING: 10000000
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: earliest
# -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
# Producer Confluent Monitoring Interceptors for Control Center streams monitoring
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
# ksql-server:
# image: confluentinc/cp-ksql-server:5.4.0-beta1
# container_name: ksql-server
# ports:
# - 8088:8088
# depends_on:
# - kafka
# environment:
# KSQL_BOOTSTRAP_SERVERS: kafka:29092
# KSQL_LISTENERS: http://0.0.0.0:8088
# KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
# KSQL_CUB_KAFKA_TIMEOUT: 300
# KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# # -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
# # Useful settings for development/laptop use - modify as needed for Prod
# KSQL_KSQL_COMMIT_INTERVAL_MS: 2000
# KSQL_KSQL_SINK_PARTITIONS: 1
# KSQL_KSQL_CACHE_MAX_BYTES_BUFFERING: 10000000
# KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: earliest
# # -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
# # Producer Confluent Monitoring Interceptors for Control Center streams monitoring
# KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
# KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"

ksql-cli:
image: confluentinc/cp-ksql-cli:5.3.0
container_name: ksql-cli
depends_on:
- ksql-server
entrypoint: /bin/sh
tty: true
# ksql-cli:
# image: confluentinc/cp-ksql-cli:5.4.0-beta1
# container_name: ksql-cli
# depends_on:
# - ksql-server
# entrypoint: /bin/sh
# tty: true

# Runs the Kafka KSQL data generator for ratings
datagen-ratings:
image: confluentinc/ksql-examples:5.1.2
container_name: datagen-ratings
depends_on:
- kafka
- schema-registry
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka:29092 1 300 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 300 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 20 && \
ksql-datagen \
quickstart=ratings \
format=avro \
topic=ratings \
maxInterval=1000 \
bootstrap-server=kafka:29092 \
schemaRegistryUrl=http://schema-registry:8081 \
propertiesFile=/etc/ksql/datagen.properties'"
# # Runs the Kafka KSQL data generator for ratings
# datagen-ratings:
# image: confluentinc/ksql-examples:5.1.2
# container_name: datagen-ratings
# depends_on:
# - kafka
# - schema-registry
# command: "bash -c 'echo Waiting for Kafka to be ready... && \
# cub kafka-ready -b kafka:29092 1 300 && \
# echo Waiting for Confluent Schema Registry to be ready... && \
# cub sr-ready schema-registry 8081 300 && \
# echo Waiting a few seconds for topic creation to finish... && \
# sleep 20 && \
# ksql-datagen \
# quickstart=ratings \
# format=avro \
# topic=ratings \
# maxInterval=1000 \
# bootstrap-server=kafka:29092 \
# schemaRegistryUrl=http://schema-registry:8081 \
# propertiesFile=/etc/ksql/datagen.properties'"


control-center:
image: confluentinc/cp-enterprise-control-center:5.3.0
container_name: control-center
depends_on:
- zookeeper
- kafka
- schema-registry
- kafka-connect-01
- ksql-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_CONNECT-A_CONNECT_CLUSTER: 'http://kafka-connect-01:8083,http://kafka-connect-02:28083'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_KSQL-A_KSQL_URL: "http://ksql-server:8088"
CONTROL_CENTER_KSQL-A_KSQL_ADVERTISED_URL: "http://localhost:8088"
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
CONTROL_CENTER_STREAMS_CACHE_MAX_BYTES_BUFFERING: 104857600
command:
- bash
- -c
- |
echo "Waiting two minutes for Kafka brokers to start and
necessary topics to be available"
sleep 120
/etc/confluent/docker/run
# control-center:
# image: confluentinc/cp-enterprise-control-center:5.4.0-beta1
# container_name: control-center
# depends_on:
# - zookeeper
# - kafka
# - schema-registry
# - kafka-connect-01
# - ksql-server
# ports:
# - "9021:9021"
# environment:
# CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:29092'
# CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# CONTROL_CENTER_CONNECT-A_CONNECT_CLUSTER: 'http://kafka-connect-01:8083,http://kafka-connect-02:28083'
# CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
# CONTROL_CENTER_KSQL-A_KSQL_URL: "http://ksql-server:8088"
# CONTROL_CENTER_KSQL-A_KSQL_ADVERTISED_URL: "http://localhost:8088"
# CONFLUENT_METRICS_TOPIC_REPLICATION: 1
# CONTROL_CENTER_REPLICATION_FACTOR: 1
# CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1
# CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1
# CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
# CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1
# CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
# CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1
# CONTROL_CENTER_STREAMS_CACHE_MAX_BYTES_BUFFERING: 104857600
# command:
# - bash
# - -c
# - |
# echo "Waiting two minutes for Kafka brokers to start and
# necessary topics to be available"
# sleep 120
# /etc/confluent/docker/run

# Other systems
mysql:
Expand Down
Loading

0 comments on commit a77e4e1

Please sign in to comment.