Skip to content

Commit

Permalink
Pulsar IO - KafkaSource - allow to manage Avro Encoded messages (apac…
Browse files Browse the repository at this point in the history
…he#9448)

### Motivation
Currently KafkaSource allows only to deal with strings and byte arrays, it does not support records with Schema.
In Kafka we have the ability to encode messages using Avro and there is a Schema Registry (by Confluent®)

### Modifications

Summary of changes:
- allow current KafkaSource (`KafkaBytesSource`) to deal with `io.confluent.kafka.serializers.KafkaAvroDeserializer ` and copy the raw bytes to the Pulsar topic, setting appropriately the Schema
- this source support Schema Evolution end-to-end (i.e. add fields to the original schema in the Kafka world, and see the new fields in the Pulsar topic, without any reconfiguration or restart)
- add Confluent® Schema Registry Client to the Kafka Connector NAR, the license is compatible with Apache 2 license and we can redistribute it
- the configuration of the Schema Registry Client is done done in the consumerProperties property of the source (usually you add schema.registry.url)
- add integration tests with Kafka and Schema Registry

### Verifying this change

The patch introduces new integration tests.
The integration tests launch a Kafka Container and also a Confluent Schema Registry Container
  • Loading branch information
eolivelli authored Mar 15, 2021
1 parent 5190af3 commit d52a1b0
Show file tree
Hide file tree
Showing 10 changed files with 786 additions and 27 deletions.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ flexible messaging model and an intuitive client API.</description>
<jcip.version>1.0</jcip.version>
<prometheus-jmx.version>0.14.0</prometheus-jmx.version>
<confluent.version>5.3.2</confluent.version>
<kafka.confluent.schemaregistryclient.version>5.3.0</kafka.confluent.schemaregistryclient.version>
<kafka.confluent.avroserializer.version>5.3.0</kafka.confluent.avroserializer.version>
<kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>
<aircompressor.version>0.16</aircompressor.version>
<asynchttpclient.version>2.12.1</asynchttpclient.version>
Expand Down
18 changes: 18 additions & 0 deletions pulsar-io/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand All @@ -48,12 +49,29 @@
<artifactId>jackson-dataformat-yaml</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-client.version}</version>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${kafka.confluent.schemaregistryclient.version}</version>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${kafka.confluent.avroserializer.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.kafka;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.concurrent.ExecutionException;

@Slf4j
final class AvroSchemaCache {
private final LoadingCache<Integer, Schema<ByteBuffer>> cache = CacheBuilder
.newBuilder()
.maximumSize(100)
.build(new CacheLoader<Integer, Schema<ByteBuffer>>() {
@Override
public Schema<ByteBuffer> load(Integer schemaId) throws Exception {
return fetchSchema(schemaId);
}
});

private final SchemaRegistryClient schemaRegistryClient;

public AvroSchemaCache(SchemaRegistryClient schemaRegistryClient) {
this.schemaRegistryClient = schemaRegistryClient;
}

public Schema<ByteBuffer> get(int schemaId) {
try {
return cache.get(schemaId);
} catch (ExecutionException err) {
throw new RuntimeException(err.getCause());
}
}

private Schema<ByteBuffer> fetchSchema(int schemaId) {
try {
org.apache.avro.Schema schema = schemaRegistryClient.getById(schemaId);
String definition = schema.toString(false);
log.info("Schema {} definition {}", schemaId, definition);
SchemaInfo schemaInfo = SchemaInfo.builder()
.type(SchemaType.AVRO)
.name(schema.getName())
.properties(Collections.emptyMap())
.schema(definition.getBytes(StandardCharsets.UTF_8)
).build();
return new Schema<ByteBuffer>() {
@Override
public byte[] encode(ByteBuffer message) {
return getBytes(message);
}

@Override
public SchemaInfo getSchemaInfo() {
return schemaInfo;
}

@Override
public Schema<ByteBuffer> clone() {
return this;
}

@Override
public ByteBuffer decode(byte[] bytes, byte[] schemaVersion) {
throw new UnsupportedOperationException();
}
};
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
}
}


private static byte[] getBytes(ByteBuffer buffer) {
buffer.mark();
byte[] avroEncodedData = new byte[buffer.remaining()];
buffer.get(avroEncodedData);
buffer.reset();
return avroEncodedData;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.kafka;

import lombok.Value;

import java.nio.ByteBuffer;

/**
* This is a wrapper around a Byte array (the Avro encoded record) and a schema id in the Kafka Schema Registry.
*/
@Value
public class BytesWithKafkaSchema {
private final ByteBuffer value;
private final int schemaId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,35 @@

package org.apache.pulsar.io.kafka;

import java.util.Collections;
import java.util.Objects;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
* Simple Kafka Source to transfer messages from a Kafka topic
* Simple Kafka Source to transfer messages from a Kafka topic.
*/
public abstract class KafkaAbstractSource<V> extends PushSource<V> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaAbstractSource.class);

private volatile Consumer<String, byte[]> consumer;
private volatile Consumer<Object, Object> consumer;
private volatile boolean running = false;
private KafkaSourceConfig kafkaSourceConfig;
private Thread runnerThread;
Expand Down Expand Up @@ -116,19 +117,20 @@ public void close() throws InterruptedException {
LOG.info("Kafka source stopped.");
}

@SuppressWarnings("unchecked")
public void start() {
runnerThread = new Thread(() -> {
LOG.info("Starting kafka source");
LOG.info("Starting kafka source on {}", kafkaSourceConfig.getTopic());
consumer.subscribe(Collections.singletonList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
ConsumerRecords<String, byte[]> consumerRecords;
while (running) {
consumerRecords = consumer.poll(1000);
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(1000);
CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
int index = 0;
for (ConsumerRecord<String, byte[]> consumerRecord : consumerRecords) {
LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
for (ConsumerRecord<Object, Object> consumerRecord : consumerRecords) {
KafkaRecord record = new KafkaRecord(consumerRecord,
extractValue(consumerRecord),
extractSchema(consumerRecord));
consume(record);
futures[index] = record.getCompletableFuture();
index++;
Expand All @@ -151,18 +153,25 @@ public void start() {
runnerThread.start();
}

public abstract V extractValue(ConsumerRecord<String, byte[]> record);
public Object extractValue(ConsumerRecord<Object, Object> consumerRecord) {
return consumerRecord.value();
}

public abstract Schema<V> extractSchema(ConsumerRecord<Object, Object> consumerRecord);

@Slf4j
static private class KafkaRecord<V> implements Record<V> {
private final ConsumerRecord<String, byte[]> record;
private final ConsumerRecord<String, ?> record;
private final V value;
private final Schema<V> schema;

@Getter
private final CompletableFuture<Void> completableFuture = new CompletableFuture<>();

public KafkaRecord(ConsumerRecord<String, byte[]> record,
V value) {
public KafkaRecord(ConsumerRecord<String,?> record, V value, Schema<V> schema) {
this.record = record;
this.value = value;
this.schema = schema;
}
@Override
public Optional<String> getPartitionId() {
Expand All @@ -188,5 +197,10 @@ public V getValue() {
public void ack() {
completableFuture.complete(null);
}

@Override
public Schema<V> getSchema() {
return schema;
}
}
}
Loading

0 comments on commit d52a1b0

Please sign in to comment.