Skip to content

Files

Latest commit

9ced230 · Mar 17, 2020

History

History
This branch is 50 commits behind confluentinc/demo-scene:master.

kafka-to-s3

Streaming data from Kafka to S3 using Kafka Connect

This uses Docker Compose to run the Kafka Connect worker.

  1. Create the S3 bucket, make a note of the region

  2. Obtain your access key pair

  3. Update aws_credentials

    • Alternatively, uncomment the environment lines for AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY and set the values here instead

  4. Bring the Docker Compose up

    docker-compose up -d
  5. Make sure everything is up and running

    $ docker-compose ps
         Name                  Command               State                    Ports
    ---------------------------------------------------------------------------------------------
    broker            /etc/confluent/docker/run   Up             0.0.0.0:9092->9092/tcp
    kafka-connect     bash -c #                   Up (healthy)   0.0.0.0:8083->8083/tcp, 9092/tcp
                      echo "Installing ...
    ksqldb            /usr/bin/docker/run         Up             0.0.0.0:8088->8088/tcp
    schema-registry   /etc/confluent/docker/run   Up             0.0.0.0:8081->8081/tcp
    zookeeper         /etc/confluent/docker/run   Up             2181/tcp, 2888/tcp, 3888/tcp
  6. Create the Sink connector

    curl -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-s3-voluble/config \
        -d '
     {
    		"connector.class": "io.confluent.connect.s3.S3SinkConnector",
    		"key.converter":"org.apache.kafka.connect.storage.StringConverter",
    		"tasks.max": "1",
    		"topics": "cats",
    		"s3.region": "us-east-1",
    		"s3.bucket.name": "rmoff-voluble-test",
    		"flush.size": "65536",
    		"storage.class": "io.confluent.connect.s3.storage.S3Storage",
    		"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    		"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    		"schema.compatibility": "NONE",
            "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
            "transforms": "AddMetadata",
            "transforms.AddMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",
            "transforms.AddMetadata.offset.field": "_offset",
            "transforms.AddMetadata.partition.field": "_partition"
    	}
    '

    Things to customise for your environment:

    • topics : the source topic(s) you want to send to S3

    • key.converter : match the serialisation of your source data (see here)

    • value.converter : match the serialisation of your source data (see here)

    • transforms : remove this if you don’t want partition and offset added to each message


If you want to create the data generator and view the data in ksqlDB:

docker exec -it ksqldb ksql http://ksqldb:8088
CREATE SOURCE CONNECTOR s WITH (
  'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector',

  'genkp.owners.with' = '#{Internet.uuid}',
  'genv.owners.name.with' = '#{Name.full_name}',
  'genv.owners.creditCardNumber.with' = '#{Finance.credit_card}',

  'genk.cats.name.with' = '#{FunnyName.name}',
  'genv.cats.owner.matching' = 'owners.key',

  'genk.diets.catName.matching' = 'cats.key.name',
  'genv.diets.dish.with' = '#{Food.vegetables}',
  'genv.diets.measurement.with' = '#{Food.measurements}',
  'genv.diets.size.with' = '#{Food.measurement_sizes}',

  'genk.adopters.name.sometimes.with' = '#{Name.full_name}',
  'genk.adopters.name.sometimes.matching' = 'adopters.key.name',
  'genv.adopters.jobTitle.with' = '#{Job.title}',
  'attrk.adopters.name.matching.rate' = '0.05',
  'topic.adopters.tombstone.rate' = '0.10',

  'global.history.records.max' = '100000'
);
SHOW TOPICS;
PRINT cats;

References