Skip to content

Commit

Permalink
Document Kafka Source Schema management (apache#10084)
Browse files Browse the repository at this point in the history
* Document Kafka Source Schema management

* Update site2/docs/io-kafka-source.md

Co-authored-by: Yu Liu <[email protected]>

* Update site2/docs/io-kafka-source.md

Co-authored-by: Jennifer Huang <[email protected]>

* fix

* Update site2/docs/io-kafka-source.md

Co-authored-by: Jennifer Huang <[email protected]>

* Apply suggestions from @Anonymitaet

Co-authored-by: Yu Liu <[email protected]>

Co-authored-by: Enrico Olivelli <[email protected]>
Co-authored-by: Yu Liu <[email protected]>
Co-authored-by: Jennifer Huang <[email protected]>
  • Loading branch information
4 people authored Apr 1, 2021
1 parent 269513a commit 77cf09e
Showing 1 changed file with 35 additions and 8 deletions.
43 changes: 35 additions & 8 deletions site2/docs/io-kafka-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the following properties.

| Name | Type| Required | Default | Description
|------|----------|---------|-------------|-------------|
| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
| `bootstrapServers` |String| true | " " (empty string) | A comma-separated list of host and port pairs for establishing the initial connection to the Kafka cluster. |
| `groupId` |String| true | " " (empty string) | A unique string that identifies the group of consumer processes to which this consumer belongs. |
| `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch response. |
| `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's offset is periodically committed in the background.<br/><br/> This committed offset is used when the process fails as the position from which a new consumer begins. |
| `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is set to true. |
| `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats to the consumer when using Kafka's group management facilities. <br/><br/>**Note: `heartbeatIntervalMs` must be smaller than `sessionTimeoutMs`**.|
| `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect consumer failures when using Kafka's group management facility. |
| `topic` | String|true | " " (empty string)| The Kafka topic which sends messages to Pulsar. |
| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer configuration properties to be passed to consumers. <br/><br/>**Note: other properties specified in the connector configuration file take precedence over this configuration**. |
| `keyDeserializationClass` | String|false | org.apache.kafka.common.serialization.StringDeserializer | The deserializer class for Kafka consumers to deserialize keys.<br/> The deserializer is set by a specific implementation of [`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
| `valueDeserializationClass` | String|false | org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer class for Kafka consumers to deserialize values.
| `autoOffsetReset` | String | false | "earliest" | The default offset reset policy. |

### Schema Management

This Kafka source connector applies the schema to the topic depending on the data type that is present on the Kafka topic.
You can detect the data type from the `keyDeserializationClass` and `valueDeserializationClass` configuration parameters.

If the `valueDeserializationClass` is `org.apache.kafka.common.serialization.StringDeserializer`, you can set Schema.STRING() as schema type on the Pulsar topic.

If `valueDeserializationClass` is `io.confluent.kafka.serializers.KafkaAvroDeserializer`, Pulsar downloads the AVRO schema from the Confluent Schema Registry®
and sets it properly on the Pulsar topic.

In this case, you need to set `schema.registry.url` inside of the `consumerConfigProperties` configuration entry
of the source.

If `keyDeserializationClass` is not `org.apache.kafka.common.serialization.StringDeserializer`, it means
that you do not have a String as key and the Kafka Source uses the KeyValue schema type with the SEPARATED encoding.

Pulsar supports AVRO format for keys.

In this case, you can have a Pulsar topic with the following properties:
- Schema: KeyValue schema with SEPARATED encoding
- Key: the content of key of the Kafka message (base64 encoded)
- Value: the content of value of the Kafka message
- KeySchema: the schema detected from `keyDeserializationClass`
- ValueSchema: the schema detected from `valueDeserializationClass`

Topic compaction and partition routing use the Pulsar key, that contains the Kafka key, and so they are driven by the same value that you have on Kafka.

When you consume data from Pulsar topics, you can use the `KeyValue` schema. In this way, you can decode the data properly.
If you want to access the raw key, you can use the `Message#getKeyBytes()` API.

### Example

Expand Down Expand Up @@ -60,7 +89,7 @@ Before using the Kafka source connector, you need to create a configuration file

## Usage

Here is an example of using the Kafka source connecter with the configuration file as shown previously.
Here is an example of using the Kafka source connector with the configuration file as shown previously.

1. Download a Kafka client and a Kafka connector.

Expand Down Expand Up @@ -95,7 +124,7 @@ Here is an example of using the Kafka source connecter with the configuration fi
5. Pull a Pulsar image and start Pulsar standalone.

```bash
$ docker pull apachepulsar/pulsar:2.4.0
$ docker pull apachepulsar/pulsar:{{pulsar:version}}

$ docker run -d -it --network kafka-pulsar -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-kafka-standalone apachepulsar/pulsar:2.4.0 bin/pulsar standalone
```
Expand Down Expand Up @@ -130,9 +159,8 @@ Here is an example of using the Kafka source connecter with the configuration fi
8. Copy the following files to Pulsar.

```bash
$ docker cp pulsar-io-kafka-2.4.0.nar pulsar-kafka-standalone:/pulsar
$ docker cp pulsar-io-kafka-{{pulsar:version}}.nar pulsar-kafka-standalone:/pulsar
$ docker cp kafkaSourceConfig.yaml pulsar-kafka-standalone:/pulsar/conf
$ docker cp kafka-clients-0.10.2.1.jar pulsar-kafka-standalone:/pulsar/lib
$ docker cp pulsar-client.py pulsar-kafka-standalone:/pulsar/
$ docker cp kafka-producer.py pulsar-kafka-standalone:/pulsar/
```
Expand All @@ -143,7 +171,7 @@ Here is an example of using the Kafka source connecter with the configuration fi
$ docker exec -it pulsar-kafka-standalone /bin/bash

$ ./bin/pulsar-admin source localrun \
--archive ./pulsar-io-kafka-2.4.0.nar \
--archive ./pulsar-io-kafka-{{pulsar:version}}.nar \
--classname org.apache.pulsar.io.kafka.KafkaBytesSource \
--tenant public \
--namespace default \
Expand All @@ -168,4 +196,3 @@ Here is an example of using the Kafka source connecter with the configuration fi
```bash
Received message: 'hello world'
```

0 comments on commit 77cf09e

Please sign in to comment.