This repo contains a sample project that can be used to start off your own source connector for Kafka Connect. This is work is tracked by HUDI-2324
The first thing you need to do to start using this connector is building it. In order to do that, you need to install the following dependencies:
- Java 1.8+
- Apache Maven
- Install kcat
- Install jq.
brew install jq
After installing these dependencies, follow steps based on your requirement.
For runtime dependencies, we encourage using the confluent HDFS connector jars. We have tested our setup with version 10.1.0
(essentially, hadoop-common
dependency version 2.10.1 is required which comes as part of confluent HDFS connector).
Either use confluent-hub to install the connector or download it from here.
You can install the confluent-hub command-line tool by downloading Confluent Platform from here.
Copy the entire folder to the classpath that will be used by the Hudi Kafka Connector.
# Points CONFLUENT_DIR to Confluent Platform installation
export CONFLUENT_DIR=/path/to/confluent_install_dir
mkdir -p /usr/local/share/kafka/plugins
$CONFLUENT_DIR/bin/confluent-hub install confluentinc/kafka-connect-hdfs:10.1.0
cp -r $CONFLUENT_DIR/share/confluent-hub-components/confluentinc-kafka-connect-hdfs/* /usr/local/share/kafka/plugins/
Now, build the packaged jar that contains all the hudi classes, including the Hudi Kafka Connector. And copy it to the
plugin path that contains all the other jars (/usr/local/share/kafka/plugins/lib
)
cd $HUDI_DIR
mvn package -DskipTests -pl packaging/hudi-kafka-connect-bundle -am
mkdir -p /usr/local/share/kafka/plugins/lib
cp $HUDI_DIR/packaging/hudi-kafka-connect-bundle/target/hudi-kafka-connect-bundle-0.13.0-SNAPSHOT.jar /usr/local/share/kafka/plugins/lib
If the Hudi Sink Connector writes to a target Hudi table on Amazon S3, you need two
additional jars, hadoop-aws-2.10.1.jar
and aws-java-sdk-bundle-1.11.271.jar
, in the plugins/lib
folder. You may
download them using the following commands. Note that, when you specify the target table path on S3, you need to use
s3a://
prefix.
cd /usr/local/share/kafka/plugins/lib
wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.271/aws-java-sdk-bundle-1.11.271.jar
wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.10.1/hadoop-aws-2.10.1.jar
Set up a Kafka broker locally. Download the latest apache kafka from here. Once downloaded and built, run the Zookeeper server and Kafka server using the command line tools. The servers should be ready in one to two minutes after executing the commands.
export KAFKA_HOME=/path/to/kafka_install_dir
cd $KAFKA_HOME
# Run the following commands in separate terminals to keep them running
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
./bin/kafka-server-start.sh ./config/server.properties
Wait until the kafka cluster is up and running.
Hudi leverages schema registry to obtain the latest schema when writing records. While it supports most popular schema registries, we use Confluent schema registry. Download the latest confluent platform and run the schema registry service.
NOTE: You must change the port from 8081
(default) to 8082
to avoid conflict, i.e.,
using listeners=http://0.0.0.0:8082
in the properties file etc/schema-registry/schema-registry.properties
.
cd $CONFLUENT_DIR
./bin/kafka-configs --bootstrap-server localhost:9092 --entity-type topics --entity-name _schemas --alter --add-config cleanup.policy=compact
# Make sure you have changed the listener port as above
./bin/schema-registry-start etc/schema-registry/schema-registry.properties
The control topic should only have 1
partition, since its used to coordinate the Hudi write transactions across the
multiple Connect tasks.
cd $KAFKA_HOME
# The following command is expected to throw an error if the control topic does not exist.
# "Error while executing topic command : Topic 'hudi-control-topic' does not exist as expected"
./bin/kafka-topics.sh --delete --topic hudi-control-topic --bootstrap-server localhost:9092
./bin/kafka-topics.sh --create --topic hudi-control-topic --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
Open a terminal to execute the following command:
cd $HUDI_DIR/hudi-kafka-connect/demo/
bash setupKafka.sh -n <total_kafka_messages>
To generate data for long-running tests, you can add -b
option to specify the number of batches of data
to generate, with each batch containing a number of messages and idle time between batches, as follows:
bash setupKafka.sh -n <num_kafka_messages_per_batch> -b <num_batches>
The Kafka connect is a distributed platform, with the ability to run one or more workers (each running multiple tasks) that parallely process the records from the Kafka partitions for the same topic. We provide a properties file with default properties to start a Hudi connector.
Note that if multiple workers need to be run, the webserver needs to be reconfigured for subsequent workers to ensure successful running of the workers.
cd $KAFKA_HOME
./bin/connect-distributed.sh $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
Ensure that the plugin.path
property points to the location where all connect plugins are installed.
For this doc, it is /usr/local/share/kafka/plugins
. If your plugins are installed at a different location,
then please change the above property in $HUDI_DIR/hudi-kafka-connect/demo/connect-distributed.properties
.
Once the Connector has started, it will not run the Sink, until the Hudi sink is added using the web api. The following curl APIs can be used to delete and add a new Hudi Sink. Again, a default configuration is provided for the Hudi Sink, that can be changed based on the desired properties.
# The following command is expected to throw an error if the Hudi Sink Connector has not been added yet.
# {"error_code":404,"message":"Connector hudi-sink not found"}
curl -X DELETE http://localhost:8083/connectors/hudi-sink
curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect/demo/config-sink.json http://localhost:8083/connectors
Now, you should see that the connector is created and tasks are running.
> curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors
["hudi-sink"]
> curl -X GET -H "Content-Type:application/json" http://localhost:8083/connectors/hudi-sink/status | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 329 100 329 0 0 21096 0 --:--:-- --:--:-- --:--:-- 36555
{
"name": "hudi-sink",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
{
"id": 3,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
}
],
"type": "sink"
}
And, you should see your Hudi table created, which you can query using Spark/Flink. Note: HUDI-2325 tracks Hive sync, which will unlock pretty much every other query engine.
> ls -a /tmp/hoodie/hudi-test-topic
. .hoodie partition_1 partition_3
.. partition_0 partition_2 partition_4
> ls -lt /tmp/hoodie/hudi-test-topic/.hoodie
total 72
-rw-r--r-- 1 user wheel 346 Sep 14 10:32 hoodie.properties
-rw-r--r-- 1 user wheel 0 Sep 13 23:18 20210913231805.inflight
-rw-r--r-- 1 user wheel 0 Sep 13 23:18 20210913231805.commit.requested
-rw-r--r-- 1 user wheel 9438 Sep 13 21:45 20210913214351.commit
-rw-r--r-- 1 user wheel 0 Sep 13 21:43 20210913214351.inflight
-rw-r--r-- 1 user wheel 0 Sep 13 21:43 20210913214351.commit.requested
-rw-r--r-- 1 user wheel 18145 Sep 13 21:43 20210913214114.commit
-rw-r--r-- 1 user wheel 0 Sep 13 21:41 20210913214114.inflight
-rw-r--r-- 1 user wheel 0 Sep 13 21:41 20210913214114.commit.requested
drwxr-xr-x 2 user wheel 64 Sep 13 21:41 archived
> ls -l /tmp/hoodie/hudi-test-topic/partition_0
total 5168
-rw-r--r-- 1 user wheel 439332 Sep 13 21:43 2E0E6DB44ACC8479059574A2C71C7A7E-0_0-0-0_20210913214114.parquet
-rw-r--r-- 1 user wheel 440179 Sep 13 21:42 3B56FAAAE2BDD04E480C1CBACD463D3E-0_0-0-0_20210913214114.parquet
-rw-r--r-- 1 user wheel 437097 Sep 13 21:45 3B56FAAAE2BDD04E480C1CBACD463D3E-0_0-0-0_20210913214351.parquet
-rw-r--r-- 1 user wheel 440219 Sep 13 21:42 D5AEE453699D5D9623D704C1CF399C8C-0_0-0-0_20210913214114.parquet
-rw-r--r-- 1 user wheel 437035 Sep 13 21:45 D5AEE453699D5D9623D704C1CF399C8C-0_0-0-0_20210913214351.parquet
-rw-r--r-- 1 user wheel 440214 Sep 13 21:43 E200FA75DCD1CED60BE86BCE6BF5D23A-0_0-0-0_20210913214114.parquet
When using Merge-On-Read (MOR) as the table type, async compaction and clustering can be scheduled when the Sink is
running. Inline compaction and clustering are disabled by default due to performance reason. By default, async
compaction scheduling is enabled, and you can disable it by setting hoodie.kafka.compaction.async.enable
to false
.
Async clustering scheduling is disabled by default, and you can enable it by setting hoodie.clustering.async.enabled
to true
.
The Sink only schedules the compaction and clustering if necessary and does not execute them for performance. You need to execute the scheduled compaction and clustering using separate Spark jobs or Hudi CLI.
After the compaction is scheduled, you can see the requested compaction instant (20211111111410.compaction.requested
)
below:
> ls -l /tmp/hoodie/hudi-test-topic/.hoodie
total 280
-rw-r--r-- 1 user wheel 21172 Nov 11 11:09 20211111110807.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:08 20211111110807.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:08 20211111110807.deltacommit.requested
-rw-r--r-- 1 user wheel 22458 Nov 11 11:11 20211111110940.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:09 20211111110940.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:09 20211111110940.deltacommit.requested
-rw-r--r-- 1 user wheel 21445 Nov 11 11:13 20211111111110.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:11 20211111111110.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:11 20211111111110.deltacommit.requested
-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.requested
-rw-r--r-- 1 user wheel 9885 Nov 11 11:14 20211111111410.compaction.requested
-rw-r--r-- 1 user wheel 21192 Nov 11 11:15 20211111111411.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.requested
-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.requested
drwxr-xr-x 2 user wheel 64 Nov 11 11:08 archived
-rw-r--r-- 1 user wheel 387 Nov 11 11:08 hoodie.properties
Then you can run async compaction job with HoodieCompactor
and spark-submit
by:
spark-submit \
--class org.apache.hudi.utilities.HoodieCompactor \
hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.13.0-SNAPSHOT.jar \
--base-path /tmp/hoodie/hudi-test-topic \
--table-name hudi-test-topic \
--schema-file /Users/user/repo/hudi/docker/demo/config/schema.avsc \
--instant-time 20211111111410 \
--parallelism 2 \
--spark-memory 1g
Note that you don't have to provide the instant time through --instant-time
. In that case, the earliest scheduled
compaction is going to be executed.
Alternatively, you can use Hudi CLI to execute compaction:
hudi-> connect --path /tmp/hoodie/hudi-test-topic
hudi:hudi-test-topic-> compactions show all
╔═════════════════════════╤═══════════╤═══════════════════════════════╗
║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
╠═════════════════════════╪═══════════╪═══════════════════════════════╣
║ 20211111111410 │ REQUESTED │ 12 ║
╚═════════════════════════╧═══════════╧═══════════════════════════════╝
compaction validate --instant 20211111111410
compaction run --compactionInstant 20211111111410 --parallelism 2 --schemaFilePath /Users/user/repo/hudi/docker/demo/config/schema.avsc
Similarly, you can see the requested clustering instant (20211111111813.replacecommit.requested
) after it is scheduled
by the Sink:
> ls -l /tmp/hoodie/hudi-test-topic/.hoodie
total 736
-rw-r--r-- 1 user wheel 24943 Nov 11 11:14 20211111111303.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:13 20211111111303.deltacommit.requested
-rw-r--r-- 1 user wheel 18681 Nov 11 11:17 20211111111410.commit
-rw-r--r-- 1 user wheel 0 Nov 11 11:17 20211111111410.compaction.inflight
-rw-r--r-- 1 user wheel 9885 Nov 11 11:14 20211111111410.compaction.requested
-rw-r--r-- 1 user wheel 21192 Nov 11 11:15 20211111111411.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:14 20211111111411.deltacommit.requested
-rw-r--r-- 1 user wheel 22460 Nov 11 11:17 20211111111530.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:15 20211111111530.deltacommit.requested
-rw-r--r-- 1 user wheel 21357 Nov 11 11:18 20211111111711.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:17 20211111111711.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:17 20211111111711.deltacommit.requested
-rw-r--r-- 1 user wheel 6516 Nov 11 11:18 20211111111813.replacecommit.requested
-rw-r--r-- 1 user wheel 26070 Nov 11 11:20 20211111111815.deltacommit
-rw-r--r-- 1 user wheel 0 Nov 11 11:18 20211111111815.deltacommit.inflight
-rw-r--r-- 1 user wheel 0 Nov 11 11:18 20211111111815.deltacommit.requested
Then you can run async clustering job with HoodieClusteringJob
and spark-submit
by:
spark-submit \
--class org.apache.hudi.utilities.HoodieClusteringJob \
hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.13.0-SNAPSHOT.jar \
--props clusteringjob.properties \
--mode execute \
--base-path /tmp/hoodie/hudi-test-topic \
--table-name sample_table \
--instant-time 20211111111813 \
--spark-memory 1g
Sample clusteringjob.properties
:
hoodie.datasource.write.recordkey.field=volume
hoodie.datasource.write.partitionpath.field=date
hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/hudi-test-topic/versions/latest
hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824
hoodie.clustering.plan.strategy.small.file.limit=629145600
hoodie.clustering.execution.strategy.class=org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy
hoodie.clustering.plan.strategy.sort.columns=volume
hoodie.write.concurrency.mode=SINGLE_WRITER
Note that you don't have to provide the instant time through --instant-time
. In that case, the earliest scheduled
clustering is going to be executed.
In this section we explain how one can test syncing of the Hudi table with Hive server/ Hive Metastore, that enable querying via Hive, Presto etc.
To ease the deployment of HDFS, Hive Server, Hive Metastore etc. for testing hive sync, we use the docker containers from the hudi docker demo. Refer to this link for the setup. Additionally, the docker deploys kafka and zookeeper too, so you do not need to run them explicitly in this setup.
Essentially, follow the steps listed here:
/etc/hosts : The demo references many services running in container by the hostname. Add the following settings to /etc/hosts
127.0.0.1 adhoc-1
127.0.0.1 adhoc-2
127.0.0.1 namenode
127.0.0.1 datanode1
127.0.0.1 hiveserver
127.0.0.1 hivemetastore
127.0.0.1 kafkabroker
127.0.0.1 sparkmaster
127.0.0.1 zookeeper
Bring up the docker containers
cd $HUDI_DIR/docker
./setup_demo.sh
Firstly, (re)-install a different connector that is configured to write the Hudi table to Hdfs instead of local filesystem.
# The following command is expected to throw an error if the Hudi Sink Connector has not been added yet.
# {"error_code":404,"message":"Connector hudi-sink not found"}
curl -X DELETE http://localhost:8083/connectors/hudi-sink
curl -X POST -H "Content-Type:application/json" -d @$HUDI_DIR/hudi-kafka-connect/demo/config-sink-hive.json http://localhost:8083/connectors
After running the connector, you can query the hive server using the following steps:
docker exec -it adhoc-2 /bin/bash
beeline -u jdbc:hive2://hiveserver:10000 \
--hiveconf hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat \
--hiveconf hive.stats.autogather=false
# List Tables
0: jdbc:hive2://hiveserver:10000> show tables;
+---------------------+--+
| tab_name |
+---------------------+--+
| huditesttopic_ro |
| huditesttopic_rt |
+---------------------+--+
3 rows selected (1.199 seconds)
0: jdbc:hive2://hiveserver:10000>
# Look at partitions that were added
0: jdbc:hive2://hiveserver:10000> show partitions huditesttopic_ro;
+-------------------+--+
| partition |
+-------------------+--+
| date=partition_0 |
| date=partition_1 |
| date=partition_2 |
| date=partition_3 |
| date=partition_4 |
+-------------------+--+
1 row selected (0.24 seconds)
0: jdbc:hive2://hiveserver:10000> select `_hoodie_commit_time`, symbol, ts, volume, open, close from huditesttopic_ro;
+----------------------+---------+----------------------+---------+------------+-----------+--+
| _hoodie_commit_time | symbol | ts | volume | open | close |
+----------------------+---------+----------------------+---------+------------+-----------+--+
| 20180924222155 | GOOG | 2018-08-31 09:59:00 | 6330 | 1230.5 | 1230.02 |
| 20180924222155 | GOOG | 2018-08-31 10:29:00 | 3391 | 1230.1899 | 1230.085 |
+----------------------+---------+----------------------+---------+------------+-----------+--+
Current Limitation:
The Hudi Kafka Connect sink uses Merge-On-Read
by default, and inserts/ appends the kafka records
directly to the log file(s). Asynchronously, compaction service can be executed to merge the log files into base file (Parquet format).
Generally, we support both Read-Optimized that reads only parquet base files and Snapshot queries that read and merge
records across base and log files. However, currently there is a limitation where we are not able to read records from
only log files. Hence, the queries for Hudi Kafka Connect will only work after compaction merges the records into base files. Alternatively,
users have the option to reconfigure the table type to COPY_ON_WRITE
in config-sink.json.
If during the execution of Hudi Sink connector, you see this error:
Caused by: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
, is very likely that your Kafka Connect service was started by an unnamed user. To see if this is your case,
ssh into your Kafka Connect container/server and run:
whoami
If you receive a message like this whoami: cannot find name for user ID ...
, you'll need to change the service user starting Kafka Connect.
If you are using Docker images, modify your Dockerfile.
To change the service user of your docker image, add this to your Dockerfile:
USER <name of an existing service user>
To create a new service user for your docker image, add this to your Dockerfile:
RUN useradd kafka-conn-service -r
USER kafka-conn-service