Skip to content

Commit

Permalink
Add Kafka Connect S3 sink example
Browse files Browse the repository at this point in the history
  • Loading branch information
rmoff committed Mar 17, 2020
1 parent f8b1f8a commit c43e052
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 0 deletions.
120 changes: 120 additions & 0 deletions kafka-to-s3/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
= Streaming data from Kafka to S3 using Kafka Connect
Robin Moffatt <robin@confluent.io>
v1.00, 17 March 2020

This uses Docker Compose to run the Kafka Connect worker.

1. Create the S3 bucket, make a note of the region
2. Obtain your access key pair
3. Update `aws_credentials`
4. Bring the Docker Compose up
+
[source,bash]
----
docker-compose up -d
----
5. Make sure everything is up and running
+
[source,bash]
----
$ docker-compose ps
Name Command State Ports
---------------------------------------------------------------------------------------------
broker /etc/confluent/docker/run Up 0.0.0.0:9092->9092/tcp
kafka-connect bash -c # Up (healthy) 0.0.0.0:8083->8083/tcp, 9092/tcp
echo "Installing ...
ksqldb /usr/bin/docker/run Up 0.0.0.0:8088->8088/tcp
schema-registry /etc/confluent/docker/run Up 0.0.0.0:8081->8081/tcp
zookeeper /etc/confluent/docker/run Up 2181/tcp, 2888/tcp, 3888/tcp
----
6. Create the Sink connector
+
[source,javascript]
----
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-voluble/config \
-d '
{
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"topics": "cats",
"s3.region": "us-east-1",
"s3.bucket.name": "rmoff-voluble-test",
"flush.size": "65536",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "NONE",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"transforms": "AddMetadata",
"transforms.AddMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddMetadata.offset.field": "_offset",
"transforms.AddMetadata.partition.field": "_partition"
}
'
----
+
Things to customise for your environment:
+
* `topics` : the source topic(s) you want to send to S3
* `key.converter` : match the serialisation of your source data (see https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/[here])
* `value.converter` : match the serialisation of your source data (see https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/[here])
* `transforms` : remove this if you don't want partition and offset added to each message
'''
If you want to create the data generator and view the data in ksqlDB:
[source,bash]
----
docker exec -it ksqldb ksql http://ksqldb:8088
----
[source,sql]
----
CREATE SOURCE CONNECTOR s WITH (
'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector',
'genkp.owners.with' = '#{Internet.uuid}',
'genv.owners.name.with' = '#{Name.full_name}',
'genv.owners.creditCardNumber.with' = '#{Finance.credit_card}',
'genk.cats.name.with' = '#{FunnyName.name}',
'genv.cats.owner.matching' = 'owners.key',
'genk.diets.catName.matching' = 'cats.key.name',
'genv.diets.dish.with' = '#{Food.vegetables}',
'genv.diets.measurement.with' = '#{Food.measurements}',
'genv.diets.size.with' = '#{Food.measurement_sizes}',
'genk.adopters.name.sometimes.with' = '#{Name.full_name}',
'genk.adopters.name.sometimes.matching' = 'adopters.key.name',
'genv.adopters.jobTitle.with' = '#{Job.title}',
'attrk.adopters.name.matching.rate' = '0.05',
'topic.adopters.tombstone.rate' = '0.10',
'global.history.records.max' = '100000'
);
----
[source,sql]
----
SHOW TOPICS;
PRINT cats;
----
'''
References
* https://rmoff.dev/crunch19-zero-to-hero-kafka-connect[From Zero to Hero with Kafka Connect]
* https://hub.confluent.io[Confluent Hub]
* https://docs.confluent.io/current/connect/kafka-connect-s3/index.html#connect-s3[S3 Sink connector docs]
* https://github.com/MichaelDrogalis/voluble[Voluble Source connector docs]
* https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/[Single Message Transform blog]
* https://docs.confluent.io/current/connect/transforms/insertfield.html[InsertField] Single Message Transform
3 changes: 3 additions & 0 deletions kafka-to-s3/aws_credentials
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[default]
aws_access_key_id = XXX
aws_secret_access_key = YYY
131 changes: 131 additions & 0 deletions kafka-to-s3/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

broker:
image: confluentinc/cp-kafka:5.4.1
container_name: broker
depends_on:
- zookeeper
ports:
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
# An important note about accessing Kafka from clients on other machines:
# -----------------------------------------------------------------------
#
# The config used here exposes port 9092 for _external_ connections to the broker
# i.e. those from _outside_ the docker network. This could be from the host machine
# running docker, or maybe further afield if you've got a more complicated setup.
# If the latter is true, you will need to change the value 'localhost' in
# KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
# remote clients
#
# For connections _internal_ to the docker network, such as from other services
# and components, use kafka:29092.
#
# See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
# "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
#
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
# -v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v-v
# Useful settings for development/laptop use - modify as needed for Prod
# This one makes ksqlDB feel a bit more responsive when queries start running
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

schema-registry:
image: confluentinc/cp-schema-registry:5.4.1
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181

kafka-connect:
image: confluentinc/cp-kafka-connect-base:5.4.1
container_name: kafka-connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/connectors/'
# If you want to use the Confluent Hub installer to d/l component, but make them available
# when running this offline, spin up the stack once and then run :
# docker cp kafka-connect:/usr/share/confluent-hub-components ./connectors
# mv ./connectors/confluent-hub-components/* ./connectors
# rm -rf ./connectors/confluent-hub-components
volumes:
- $PWD/connectors:/connectors
- $PWD/aws_credentials:/root/.aws/credentials
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
command:
- bash
- -c
- |
#
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:5.4.1
confluent-hub install --no-prompt mdrogalis/voluble:0.1.0
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
ksqldb:
image: confluentinc/ksqldb-server:0.7.1
hostname: ksqldb
container_name: ksqldb
depends_on:
- broker
- kafka-connect
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:29092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
KSQL_KSQL_HIDDEN_TOPICS: '^_.*'

0 comments on commit c43e052

Please sign in to comment.