Skip to content

Commit

Permalink
[Pulsar IO][Issue 5633]Support avro schema for debezium connector (ap…
Browse files Browse the repository at this point in the history
…ache#6034)

Fixes apache#5633 


### Motivation

Currently, some users want to support Avro schema in debezium, so this pr supports this feature.
For Kafka's Avro schema, it depends on the Avro 1.8 version, but Avro version has just been upgraded to 1.9 in pulsar, so shade is needed to avoid finding `addProp` function

### Modifications

* Add a package `kafka-connect-avro-converter-shaded`
* Add class KafkaSchema to converter Kafka's Avro schema to pulsar's schema

### Verifying this change 

Unit test and integration tests
  • Loading branch information
tuteng authored Apr 30, 2020
1 parent 466b0b8 commit e08be96
Show file tree
Hide file tree
Showing 21 changed files with 645 additions and 91 deletions.
128 changes: 128 additions & 0 deletions kafka-connect-avro-converter-shaded/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>pulsar</artifactId>
<groupId>org.apache.pulsar</groupId>
<version>2.6.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<artifactId>kafka-connect-avro-converter-shaded</artifactId>
<name>Apache Pulsar :: Kafka Connect Avro Converter shaded</name>

<dependencies>
<!-- confluent connect avro converter -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>${kafka-avro-convert-jackson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${kafka-avro-convert-jackson.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<promoteTransitiveDependencies>true</promoteTransitiveDependencies>

<artifactSet>
<includes>
<include>io.confluent:*</include>
<include>io.confluent:kafka-avro-serializer</include>
<include>io.confluent:kafka-schema-registry-client</include>
<include>io.confluent:common-config</include>
<include>io.confluent:common-utils</include>
<include>org.apache.avro:*</include>

<include>org.codehaus.jackson:jackson-core-asl</include>
<include>org.codehaus.jackson:jackson-mapper-asl</include>
<include>com.thoughtworks.paranamer:paranamer</include>
<include>org.xerial.snappy:snappy-java</include>
<include>org.apache.commons:commons-compress</include>
<include>org.tukaani:xz</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>io.confluent</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.io.confluent</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.avro</shadedPattern>
</relocation>
<relocation>
<pattern>org.codehaus.jackson</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.org.codehaus.jackson</shadedPattern>
</relocation>
<relocation>
<pattern>com.thoughtworks.paranamer</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.com.thoughtworks.paranamer</shadedPattern>
</relocation>
<relocation>
<pattern>org.xerial.snappy</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.org.xerial.snappy</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.commons</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.org.apache.commons</shadedPattern>
</relocation>
<relocation>
<pattern>org.tukaani</pattern>
<shadedPattern>org.apache.pulsar.kafka.shade.org.tukaani</shadedPattern>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
</transformers>
</configuration>
</plugin>
</plugins>
</build>

</project>
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ flexible messaging model and an intuitive client API.</description>
<!-- connector-related modules -->
<module>pulsar-io</module>

<!-- kafka connect avro converter shaded, because version mismatch -->
<module>kafka-connect-avro-converter-shaded</module>

<!-- examples -->
<module>examples</module>

Expand Down Expand Up @@ -208,6 +211,8 @@ flexible messaging model and an intuitive client API.</description>
<guava.version>25.1-jre</guava.version>
<jcip.version>1.0</jcip.version>
<prometheus-jmx.version>0.12.0</prometheus-jmx.version>
<confluent.version>5.3.2</confluent.version>
<kafka-avro-convert-jackson.version>1.9.13</kafka-avro-convert-jackson.version>

<!-- test dependencies -->
<cassandra.version>3.6.0</cassandra.version>
Expand All @@ -220,6 +225,7 @@ flexible messaging model and an intuitive client API.</description>
<javassist.version>3.25.0-GA</javassist.version>
<failsafe.version>2.3.1</failsafe.version>
<skyscreamer.version>1.5.0</skyscreamer.version>
<confluent.version>5.2.2</confluent.version>

<!-- Plugin dependencies -->
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
Expand Down Expand Up @@ -1715,5 +1721,9 @@ flexible messaging model and an intuitive client API.</description>
<id>spring-plugins-release</id>
<url>https://repo.spring.io/plugins-release/</url>
</repository>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -78,6 +79,11 @@ public SchemaReader<T> load(BytesSchemaVersion schemaVersion) {
protected StructSchema(SchemaInfo schemaInfo) {
this.schema = parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8));
this.schemaInfo = schemaInfo;

if (schemaInfo.getProperties().containsKey(GenericAvroSchema.OFFSET_PROP)) {
this.schema.addProp(GenericAvroSchema.OFFSET_PROP,
schemaInfo.getProperties().get(GenericAvroSchema.OFFSET_PROP));
}
}

public org.apache.avro.Schema getAvroSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class GenericAvroReader implements SchemaReader<GenericRecord> {
private final List<Field> fields;
private final Schema schema;
private final byte[] schemaVersion;
private int offset;

public GenericAvroReader(Schema schema) {
this(null, schema, null);
}
Expand All @@ -65,12 +67,22 @@ public GenericAvroReader(Schema writerSchema, Schema readerSchema, byte[] schema
}
this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, encoder);

if (schema.getObjectProp(GenericAvroSchema.OFFSET_PROP) != null) {
this.offset = Integer.parseInt(schema.getObjectProp(GenericAvroSchema.OFFSET_PROP).toString());
} else {
this.offset = 0;
}

}

@Override
public GenericAvroRecord read(byte[] bytes, int offset, int length) {
try {
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, offset, length, null);
if (offset == 0 && this.offset > 0) {
offset = this.offset;
}
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, offset, length - offset, null);
org.apache.avro.generic.GenericRecord avroRecord =
(org.apache.avro.generic.GenericRecord)reader.read(
null,
Expand Down Expand Up @@ -101,5 +113,9 @@ public GenericRecord read(InputStream inputStream) {
}
}

public int getOffset() {
return offset;
}

private static final Logger log = LoggerFactory.getLogger(GenericAvroReader.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
@Slf4j
public class GenericAvroSchema extends GenericSchemaImpl {

public final static String OFFSET_PROP = "__AVRO_READ_OFFSET__";

public GenericAvroSchema(SchemaInfo schemaInfo) {
this(schemaInfo, true);
}
Expand Down Expand Up @@ -73,6 +75,8 @@ protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersio
schemaInfo);
Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
readerSchema.addProp(OFFSET_PROP, schemaInfo.getProperties().getOrDefault(OFFSET_PROP, "0"));

return new GenericAvroReader(
writerSchema,
readerSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import static org.testng.Assert.assertEquals;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
Expand All @@ -38,18 +40,22 @@ public class GenericAvroReaderTest {
private AvroSchema fooSchemaNotNull;
private AvroSchema fooSchema;
private AvroSchema fooV2Schema;

private AvroSchema fooOffsetSchema;

@BeforeMethod
public void setup() {
fooSchema = AvroSchema.of(Foo.class);

fooV2Schema = AvroSchema.of(FooV2.class);
fooSchemaNotNull = AvroSchema.of(SchemaDefinition
.builder()
.withAlwaysAllowNull(false)
.withPojo(Foo.class)
.build());

fooOffsetSchema = AvroSchema.of(Foo.class);
fooOffsetSchema.getAvroSchema().addProp(GenericAvroSchema.OFFSET_PROP, 5);

foo = new Foo();
foo.setField1("foo1");
foo.setField2("bar1");
Expand Down Expand Up @@ -83,4 +89,20 @@ public void testGenericAvroReaderByReaderSchema() {
assertEquals(genericRecordByReaderSchema.getField("field3"), 10);
}

@Test
public void testOffsetSchema() {
byte[] fooBytes = fooOffsetSchema.encode(foo);
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeByte(0);
byteBuf.writeInt(10);
byteBuf.writeBytes(fooBytes);

GenericAvroReader reader = new GenericAvroReader(fooOffsetSchema.getAvroSchema());
assertEquals(reader.getOffset(), 5);
GenericRecord record = reader.read(byteBuf.array());
assertEquals(record.getField("field1"), "foo1");
assertEquals(record.getField("field2"), "bar1");
assertEquals(record.getField("fieldUnableNull"), "notNull");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.api;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValueEncodingType;

/**
* key value schema record.
*/
public interface KVRecord<K, V> extends Record {

Schema<K> getKeySchema();

Schema<V> getValueSchema();

KeyValueEncodingType getKeyValueEncodingType();

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.api;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;

import java.util.Collections;
import java.util.Map;
Expand All @@ -43,6 +44,10 @@ default Optional<String> getKey() {
return Optional.empty();
}

default Schema<T> getSchema() {
return null;
}

/**
* Retrieves the actual data of the record.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@
import lombok.AllArgsConstructor;
import lombok.Data;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;

@Slf4j
@Data
@AllArgsConstructor
public class SinkRecord<T> implements Record<T> {
Expand Down Expand Up @@ -81,4 +86,24 @@ public void fail() {
public Optional<String> getDestinationTopic() {
return sourceRecord.getDestinationTopic();
}

@Override
public Schema<T> getSchema() {
if (sourceRecord == null) {
return null;
}

if (sourceRecord.getSchema() != null) {
return sourceRecord.getSchema();
}

if (sourceRecord instanceof KVRecord) {
KVRecord kvRecord = (KVRecord) sourceRecord;
return KeyValueSchema.of(kvRecord.getKeySchema(), kvRecord.getValueSchema(),
kvRecord.getKeyValueEncodingType());
}

return null;
}

}
Loading

0 comments on commit e08be96

Please sign in to comment.