Skip to content

Commit

Permalink
[pulsar-io] KCA: properly handle KeyValue that getNativeObject() retu…
Browse files Browse the repository at this point in the history
…rns: corrected type + support for KeyValue<GenericRecord, GenericRecord> (apache#15025)
  • Loading branch information
dlg99 authored Apr 14, 2022
1 parent f9cfc3e commit d76b5d4
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.kafka.connect.schema.KafkaConnectData;
Expand Down Expand Up @@ -255,11 +255,21 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
&& sourceRecord.getSchema().getSchemaInfo() != null
&& sourceRecord.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) sourceRecord.getSchema();
KeyValue kv = (KeyValue) sourceRecord.getValue().getNativeObject();
keySchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getKeySchema());
valueSchema = PulsarSchemaToKafkaSchema.getKafkaConnectSchema(kvSchema.getValueSchema());
key = kv.getKey();
value = kv.getValue();

Object nativeObject = sourceRecord.getValue().getNativeObject();

if (nativeObject instanceof KeyValue) {
KeyValue kv = (KeyValue) nativeObject;
key = KafkaConnectData.getKafkaConnectData(kv.getKey(), keySchema);
value = KafkaConnectData.getKafkaConnectData(kv.getValue(), valueSchema);
} else if (nativeObject != null) {
throw new IllegalStateException("Cannot extract KeyValue data from " + nativeObject.getClass());
} else {
key = null;
value = null;
}
} else {
if (sourceRecord.getMessage().get().hasBase64EncodedKey()) {
key = sourceRecord.getMessage().get().getKeyBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.pulsar.client.api.schema.GenericRecord;

public class KafkaConnectData {
public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema) {
Expand All @@ -49,6 +50,10 @@ public static Object getKafkaConnectData(Object nativeObject, Schema kafkaSchema
} else if (nativeObject instanceof GenericData.Record) {
GenericData.Record avroRecord = (GenericData.Record) nativeObject;
return avroAsConnectData(avroRecord, kafkaSchema);
} else if (nativeObject instanceof GenericRecord) {
// Pulsar's GenericRecord
GenericRecord pulsarGenericRecord = (GenericRecord) nativeObject;
return pulsarGenericRecordAsConnectData(pulsarGenericRecord, kafkaSchema);
}

return nativeObject;
Expand All @@ -69,6 +74,21 @@ static Object avroAsConnectData(GenericData.Record avroRecord, Schema kafkaSchem
return struct;
}

static Object pulsarGenericRecordAsConnectData(GenericRecord genericRecord, Schema kafkaSchema) {
if (kafkaSchema == null) {
if (genericRecord == null) {
return null;
}
throw new DataException("Don't know how to convert " + genericRecord + " to Connect data (schema is null).");
}

Struct struct = new Struct(kafkaSchema);
for (Field field : kafkaSchema.fields()) {
struct.put(field, getKafkaConnectData(genericRecord.getField(field.name()), field.schema()));
}
return struct;
}

// with some help of
// https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
static Object jsonAsConnectData(JsonNode jsonNode, Schema kafkaSchema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,19 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroRecord;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.io.core.KeyValue;
import org.apache.pulsar.io.core.SinkContext;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -58,12 +60,14 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.AbstractMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -328,19 +332,29 @@ private SinkRecord recordSchemaTest(Object value, Schema schema, Object expected
ObjectMapper om = new ObjectMapper();
Map<String, Object> result = om.readValue(lines.get(0), new TypeReference<Map<String, Object>>(){});

assertEquals(result.get("key"), expectedKey);
assertEquals(result.get("value"), expected);
assertEquals(result.get("keySchema"), expectedKeySchema);
assertEquals(result.get("valueSchema"), expectedSchema);
assertEquals(expectedKey, result.get("key"));
assertEquals(expected, result.get("value"));
assertEquals(expectedKeySchema, result.get("keySchema"));
assertEquals(expectedSchema, result.get("valueSchema"));

SinkRecord sinkRecord = sink.toSinkRecord(record);
return sinkRecord;
}

private GenericRecord getGenericRecord(Object value, Schema schema) {
final GenericRecord rec;
if(value instanceof GenericRecord) {
if (value instanceof GenericRecord) {
rec = (GenericRecord) value;
} else if (value instanceof org.apache.avro.generic.GenericRecord) {
org.apache.avro.generic.GenericRecord avroRecord =
(org.apache.avro.generic.GenericRecord) value;
org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) schema.getNativeSchema().get();
List<Field> fields = avroSchema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList());

return new GenericAvroRecord(new byte[]{ 1 }, avroSchema, fields, avroRecord);
} else {
rec = MockGenericObjectWrapper.builder()
.nativeObject(value)
Expand Down Expand Up @@ -502,7 +516,7 @@ public void unknownRecordSchemaTest() throws Exception {
}

@Test
public void KeyValueSchemaTest() throws Exception {
public void schemaKeyValueSchemaTest() throws Exception {
KeyValue<Integer, String> kv = new KeyValue<>(11, "value");
SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(Schema.INT32, Schema.STRING), 11, "INT32", "value", "STRING");
String val = (String) sinkRecord.value();
Expand All @@ -511,6 +525,114 @@ public void KeyValueSchemaTest() throws Exception {
Assert.assertEquals(key, 11);
}

@Test
public void schemaKeyValueAvroSchemaTest() throws Exception {
AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
= AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);

final GenericData.Record key = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
key.put("field1", 11);
key.put("field2", "key");
key.put("field3", 101L);

final GenericData.Record value = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
value.put("field1", 10);
value.put("field2", "value");
value.put("field3", 100L);

Map<String, Object> expectedKey = new LinkedHashMap<>();
expectedKey.put("field1", 11);
expectedKey.put("field2", "key");
// integer is coming back from ObjectMapper
expectedKey.put("field3", 101);

Map<String, Object> expectedValue = new LinkedHashMap<>();
expectedValue.put("field1", 10);
expectedValue.put("field2", "value");
// integer is coming back from ObjectMapper
expectedValue.put("field3", 100);

KeyValue<GenericRecord, GenericRecord> kv = new KeyValue<>(getGenericRecord(key, pulsarAvroSchema),
getGenericRecord(value, pulsarAvroSchema));

SinkRecord sinkRecord = recordSchemaTest(kv, Schema.KeyValue(pulsarAvroSchema, pulsarAvroSchema),
expectedKey, "STRUCT", expectedValue, "STRUCT");

Struct outValue = (Struct) sinkRecord.value();
Assert.assertEquals((int)outValue.get("field1"), 10);
Assert.assertEquals((String)outValue.get("field2"), "value");
Assert.assertEquals((long)outValue.get("field3"), 100L);

Struct outKey = (Struct) sinkRecord.key();
Assert.assertEquals((int)outKey.get("field1"), 11);
Assert.assertEquals((String)outKey.get("field2"), "key");
Assert.assertEquals((long)outKey.get("field3"), 101L);
}

@Test
public void nullKeyValueSchemaTest() throws Exception {
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());

KafkaConnectSink sink = new KafkaConnectSink();
sink.open(props, context);

Message msg = mock(MessageImpl.class);
// value is null
when(msg.getValue()).thenReturn(null);
when(msg.getKey()).thenReturn("key");
when(msg.hasKey()).thenReturn(true);
when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));

final AtomicInteger status = new AtomicInteger(0);
Record<GenericObject> record = PulsarRecord.<String>builder()
.topicName("fake-topic")
.message(msg)
.schema(Schema.KeyValue(Schema.INT32, Schema.STRING))
.ackFunction(status::incrementAndGet)
.failFunction(status::decrementAndGet)
.build();

sink.write(record);
sink.flush();

// expect fail
assertEquals(status.get(), -1);

sink.close();
}

@Test
public void wrongKeyValueSchemaTest() throws Exception {
props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());

KafkaConnectSink sink = new KafkaConnectSink();
sink.open(props, context);

Message msg = mock(MessageImpl.class);
// value is of a wrong/unsupported type
when(msg.getValue()).thenReturn(new AbstractMap.SimpleEntry<>(11, "value"));
when(msg.getKey()).thenReturn("key");
when(msg.hasKey()).thenReturn(true);
when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));

final AtomicInteger status = new AtomicInteger(0);
Record<GenericObject> record = PulsarRecord.<String>builder()
.topicName("fake-topic")
.message(msg)
.schema(Schema.KeyValue(Schema.INT32, Schema.STRING))
.ackFunction(status::incrementAndGet)
.failFunction(status::decrementAndGet)
.build();

sink.write(record);
sink.flush();

// expect fail
assertEquals(status.get(), -1);

sink.close();
}

@Test
public void offsetTest() throws Exception {
final AtomicLong entryId = new AtomicLong(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -37,6 +38,7 @@
* A FileStreamSinkTask for testing that writes data other than just a value, i.e.:
* key, value, key and value schemas.
*/
@Slf4j
public class SchemaedFileStreamSinkTask extends FileStreamSinkTask {

@Override
Expand All @@ -49,32 +51,28 @@ public void put(Collection<SinkRecord> sinkRecords) {
? new String((byte[]) record.value(), StandardCharsets.US_ASCII)
: record.value();

Object key = record.keySchema() == Schema.BYTES_SCHEMA
? new String((byte[]) record.key(), StandardCharsets.US_ASCII)
: record.key();

Map<String, Object> recOut = Maps.newHashMap();
recOut.put("keySchema", record.keySchema().type().toString());
recOut.put("valueSchema", record.valueSchema().type().toString());
recOut.put("key", record.key());
if (val instanceof Struct) {
Map<String, Object> map = Maps.newHashMap();
Struct struct = (Struct)val;

// no recursion needed for tests
for (Field f: struct.schema().fields()) {
map.put(f.name(), struct.get(f));
}

recOut.put("value", map);
} else {
recOut.put("value", val);
}
recOut.put("key", toWritableValue(key));
recOut.put("value", toWritableValue(val));

ObjectMapper om = new ObjectMapper();
try {
String valueAsString = om.writeValueAsString(recOut);

log.info("FileSink writing {}", valueAsString);

SinkRecord toSink = new SinkRecord(record.topic(),
record.kafkaPartition(),
record.keySchema(),
record.key(),
Schema.STRING_SCHEMA,
om.writeValueAsString(recOut),
"", // blank key, real one is serialized with recOut
Schema.STRING_SCHEMA,
valueAsString,
record.kafkaOffset(),
record.timestamp(),
record.timestampType());
Expand All @@ -87,4 +85,19 @@ public void put(Collection<SinkRecord> sinkRecords) {
super.put(out);
}

private Object toWritableValue(Object val) {
if (val instanceof Struct) {
Map<String, Object> map = Maps.newHashMap();
Struct struct = (Struct) val;

// no recursion needed for tests
for (Field f: struct.schema().fields()) {
map.put(f.name(), struct.get(f));
}
return map;
} else {
return val;
}
}

}

0 comments on commit d76b5d4

Please sign in to comment.