Skip to content

Commit

Permalink
Add KeyValueSchema to satisfy org.apache.kafka.connect.connector.Conn…
Browse files Browse the repository at this point in the history
…ectRecord (apache#2885)

Kafka org.apache.kafka.connect.connector.ConnectRecord has both key and value schema.
Currently we need a KeyValueSchema to achieve it and support debezium.
There will be another PR to do the convert between KeyValueSchema and org.apache.kafka.connect.data.Schema.
  • Loading branch information
jiazhai authored and sijie committed Oct 31, 2018
1 parent 649c6ca commit 2591ccd
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;

import static com.google.common.base.Preconditions.checkArgument;

import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
Expand All @@ -28,12 +30,15 @@
import org.apache.pulsar.client.impl.schema.FloatSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.LongSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.client.impl.schema.ShortSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

/**
* Message schema definition
Expand Down Expand Up @@ -104,6 +109,33 @@ static <T> Schema<T> JSON(Class<T> clazz) {
return JSONSchema.of(clazz);
}

/**
* Key Value Schema using passed in schema type, support JSON and AVRO currently.
*/
static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value, SchemaType type) {
checkArgument(SchemaType.JSON == type || SchemaType.AVRO == type);
if (SchemaType.JSON == type) {
return new KeyValueSchema(JSONSchema.of(key), JSONSchema.of(value));
} else {
// AVRO
return new KeyValueSchema(AvroSchema.of(key), AvroSchema.of(value));
}
}

/**
* Key Value Schema whose underneath key and value schemas are JSONSchema.
*/
static <K, V> Schema<KeyValue<K, V>> KeyValue(Class<K> key, Class<V> value) {
return new KeyValueSchema(JSONSchema.of(key), JSONSchema.of(value));
}

/**
* Key Value Schema using passed in key and value schemas.
*/
static <K, V> Schema<KeyValue<K, V>> KeyValue(Schema<K> key, Schema<V> value) {
return new KeyValueSchema(key, value);
}

@Deprecated
static Schema<GenericRecord> AUTO() {
return AUTO_CONSUME();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema;

import java.nio.ByteBuffer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

/**
* [Key, Value] pair schema definition
*/
@Slf4j
public class KeyValueSchema<K, V> implements Schema<KeyValue<K, V>> {
@Getter
private final Schema<K> keySchema;
@Getter
private final Schema<V> valueSchema;

// schemaInfo combined by KeySchemaInfo and ValueSchemaInfo:
// [keyInfo.length][keyInfo][valueInfo.length][ValueInfo]
private SchemaInfo schemaInfo;

public KeyValueSchema(Schema<K> keySchema,
Schema<V> valueSchema) {
this.keySchema = keySchema;
this.valueSchema = valueSchema;

// set schemaInfo
this.schemaInfo = new SchemaInfo()
.setName("KeyValue")
.setType(SchemaType.KEY_VALUE);

byte[] keySchemaInfo = keySchema.getSchemaInfo().getSchema();
byte[] valueSchemaInfo = valueSchema.getSchemaInfo().getSchema();

ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keySchemaInfo.length + 4 + valueSchemaInfo.length);
byteBuffer.putInt(keySchemaInfo.length).put(keySchemaInfo)
.putInt(valueSchemaInfo.length).put(valueSchemaInfo);
this.schemaInfo.setSchema(byteBuffer.array());
}

// encode as bytes: [key.length][key.bytes][value.length][value.bytes]
public byte[] encode(KeyValue<K, V> message) {
byte[] keyBytes = keySchema.encode(message.getKey());
byte[] valueBytes = valueSchema.encode(message.getValue());

ByteBuffer byteBuffer = ByteBuffer.allocate(4 + keyBytes.length + 4 + valueBytes.length);
byteBuffer.putInt(keyBytes.length).put(keyBytes).putInt(valueBytes.length).put(valueBytes);
return byteBuffer.array();
}

public KeyValue<K, V> decode(byte[] bytes) {
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
int keyLength = byteBuffer.getInt();
byte[] keyBytes = new byte[keyLength];
byteBuffer.get(keyBytes);

int valueLength = byteBuffer.getInt();
byte[] valueBytes = new byte[valueLength];
byteBuffer.get(valueBytes);

return new KeyValue<>(keySchema.decode(keyBytes), valueSchema.decode(valueBytes));
}

public SchemaInfo getSchemaInfo() {
return this.schemaInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.schema;

import static org.testng.Assert.assertEquals;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.schema.SchemaTestUtils.Bar;
import org.apache.pulsar.client.schema.SchemaTestUtils.Color;
import org.apache.pulsar.client.schema.SchemaTestUtils.Foo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;

@Slf4j
public class KeyValueSchemaTest {

@Test
public void testAvroSchemaCreate() {
AvroSchema<Foo> fooSchema = AvroSchema.of(Foo.class);
AvroSchema<Bar> barSchema = AvroSchema.of(Bar.class);

Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(Foo.class, Bar.class, SchemaType.AVRO);

assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);

assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
SchemaType.AVRO);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
SchemaType.AVRO);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
SchemaType.AVRO);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
SchemaType.AVRO);

String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
String schemaInfo2 = new String(keyValueSchema2.getSchemaInfo().getSchema());
assertEquals(schemaInfo1, schemaInfo2);
}

@Test
public void testJsonSchemaCreate() {
JSONSchema<Foo> fooSchema = JSONSchema.of(Foo.class);
JSONSchema<Bar> barSchema = JSONSchema.of(Bar.class);

Schema<KeyValue<Foo, Bar>> keyValueSchema1 = Schema.KeyValue(fooSchema, barSchema);
Schema<KeyValue<Foo, Bar>> keyValueSchema2 = Schema.KeyValue(Foo.class, Bar.class, SchemaType.JSON);
Schema<KeyValue<Foo, Bar>> keyValueSchema3 = Schema.KeyValue(Foo.class, Bar.class);

assertEquals(keyValueSchema1.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema2.getSchemaInfo().getType(), SchemaType.KEY_VALUE);
assertEquals(keyValueSchema3.getSchemaInfo().getType(), SchemaType.KEY_VALUE);

assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema1).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema2).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema3).getKeySchema().getSchemaInfo().getType(),
SchemaType.JSON);
assertEquals(((KeyValueSchema<Foo, Bar>)keyValueSchema3).getValueSchema().getSchemaInfo().getType(),
SchemaType.JSON);

String schemaInfo1 = new String(keyValueSchema1.getSchemaInfo().getSchema());
String schemaInfo2 = new String(keyValueSchema2.getSchemaInfo().getSchema());
String schemaInfo3 = new String(keyValueSchema3.getSchemaInfo().getSchema());
assertEquals(schemaInfo1, schemaInfo2);
assertEquals(schemaInfo1, schemaInfo3);
}

@Test
public void testSchemaEncodeAndDecode() {
Schema keyValueSchema = Schema.KeyValue(Foo.class, Bar.class);

Bar bar = new Bar();
bar.setField1(true);

Foo foo = new Foo();
foo.setField1("field1");
foo.setField2("field2");
foo.setField3(3);
foo.setField4(bar);
foo.setColor(Color.RED);

byte[] encodeBytes = keyValueSchema.encode(new KeyValue(foo, bar));
Assert.assertTrue(encodeBytes.length > 0);

KeyValue<Foo, Bar> keyValue = (KeyValue<Foo, Bar>)keyValueSchema.decode(encodeBytes);
Foo fooBack = keyValue.getKey();
Bar barBack = keyValue.getValue();

assertEquals(foo, fooBack);
assertEquals(bar, barBack);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.schema;

/**
* A simple KeyValue class
*/
public class KeyValue<K, V> {
private final K key;
private final V value;

public KeyValue(K key, V value) {
this.key = key;
this.value = value;
}

public K getKey() {
return key;
}

public V getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,5 +96,10 @@ public enum SchemaType {
/**
* Auto Publish Type.
*/
AUTO_PUBLISH
AUTO_PUBLISH,

/**
* A Schema that contains Key Schema and Value Schema.
*/
KEY_VALUE
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,19 @@
*/
package org.apache.pulsar.io.kafka.connect;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.kafka.connect.file.FileStreamSourceTask.FILENAME_FIELD;
import static org.apache.kafka.connect.file.FileStreamSourceTask.POSITION_FIELD;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.file.FileStreamSourceTask;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.util.Callback;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.functions.api.Record;
import org.testng.annotations.AfterMethod;
Expand Down

0 comments on commit 2591ccd

Please sign in to comment.