Skip to content

Commit

Permalink
Sink<GenericObject> unwrap internal AutoConsumeSchema and allow to ha…
Browse files Browse the repository at this point in the history
…ndle topics with KeyValue schema (apache#10211)
  • Loading branch information
eolivelli authored Apr 17, 2021
1 parent f986cfe commit 0f07dda
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.EncryptionContext;

public class TopicMessageImpl<T> implements Message<T> {
Expand Down Expand Up @@ -175,4 +176,12 @@ public String getReplicatedFrom() {
public Message<T> getMessage() {
return msg;
}

public Schema<T> getSchema() {
if (this.msg instanceof MessageImpl) {
MessageImpl message = (MessageImpl) this.msg;
return message.getSchema();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
Expand Down Expand Up @@ -100,7 +101,12 @@ public Schema<T> getSchema() {
}

if (sourceRecord.getSchema() != null) {
return sourceRecord.getSchema();
// unwrap actual schema
Schema<T> schema = sourceRecord.getSchema();
if (schema instanceof AutoConsumeSchema) {
schema = (Schema<T>) ((AutoConsumeSchema) schema).getInternalSchema();
}
return schema;
}

if (sourceRecord instanceof KVRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.TopicMessageImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.util.Reflections;
Expand Down Expand Up @@ -132,6 +133,9 @@ public void received(Consumer<T> consumer, Message<T> message) {
if (message instanceof MessageImpl) {
MessageImpl impl = (MessageImpl) message;
schema = impl.getSchema();
} else if (message instanceof TopicMessageImpl) {
TopicMessageImpl impl = (TopicMessageImpl) message;
schema = impl.getSchema();
}
Record<T> record = PulsarRecord.<T>builder()
.message(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,13 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.KVRecord;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

@Slf4j
public class TestGenericObjectSink implements Sink<GenericObject> {
Expand All @@ -41,10 +36,32 @@ public void open(Map<String, Object> config, SinkContext sourceContext) throws E
}

public void write(Record<GenericObject> record) {

log.info("properties {}", record.getProperties());
log.info("received record {} {}", record, record.getClass());
log.info("schema {}", record.getSchema());
log.info("native schema {}", record.getSchema().getNativeSchema().orElse(null));

String expectedRecordType = record.getProperties().getOrDefault("expectedType", "MISSING");
if (!expectedRecordType.equals(record.getSchema().getSchemaInfo().getType().name())) {
throw new RuntimeException("Unexpected record type "+record.getSchema().getSchemaInfo().getType().name() +" is not "+expectedRecordType);
}

log.info("value {}", record.getValue());
log.info("value schema type {}", record.getValue().getSchemaType());
log.info("value native object {}", record.getValue().getNativeObject());

if (record.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
// assert that we are able to access the schema (leads to ClassCastException if there is a problem)
KeyValueSchema kvSchema = (KeyValueSchema) record.getSchema();
log.info("key schema type {}", kvSchema.getKeySchema());
log.info("value schema type {}", kvSchema.getValueSchema());
log.info("key encoding {}", kvSchema.getKeyValueEncodingType());

KeyValue keyValue = (KeyValue) record.getValue().getNativeObject();
log.info("kvkey {}", keyValue.getKey());
log.info("kvvalue {}", keyValue.getValue());
}
log.info("value {}", record.getValue());
log.info("value schema type {}", record.getValue().getSchemaType());
log.info("value native object {}", record.getValue().getNativeObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import lombok.Cleanup;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -36,8 +38,11 @@
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -71,16 +76,29 @@ public static final class Pojo {
private int field2;
}

@Data
@Builder
public static final class PojoKey {
private String field1;
}

@Test(groups = {"sink"})
public void testGenericObjectSink() throws Exception {

@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(container.getPlainTextServiceUrl())
.build();

// we are not using a parametrized test in order to save resources
// we create N sinks, send the records and verify each sink
// sinks execution happens in parallel
List<SinkSpec> specs = Arrays.asList(
new SinkSpec("test-kv-sink-input-string-" + randomName(8), "test-kv-sink-string-" + randomName(8), Schema.STRING, "foo"),
new SinkSpec("test-kv-sink-input-int-" + randomName(8), "test-kv-sink-int-" + randomName(8), Schema.INT32, 123),
new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()),
new SinkSpec("test-kv-sink-input-json-" + randomName(8), "test-kv-sink-json-" + randomName(8), Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build())
new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)),
new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8),
Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build()))
);
// submit all sinks
for (SinkSpec spec : specs) {
Expand All @@ -93,32 +111,48 @@ public void testGenericObjectSink() throws Exception {
getSinkStatus(spec.sinkName);
}

@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(container.getPlainTextServiceUrl())
.build();

final int numRecords = 10;

for (SinkSpec spec : specs) {
@Cleanup Producer<Object> producer = client.newProducer(spec.schema)
.topic(spec.outputTopicName)
.create();
for (int i = 0; i < numRecords; i++) {
producer.send(spec.testValue);
MessageId messageId = producer.newMessage()
.value(spec.testValue)
.property("expectedType", spec.schema.getSchemaInfo().getType().toString())
.send();
log.info("sent message {} {} with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId);
}
}

// wait that all sinks processed all records without errors
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {

for (SinkSpec spec : specs) {
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
try {
log.info("waiting for sink {}", spec.sinkName);
for (int i = 0; i < 120; i++) {
SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0);
if (instance.getStatus().numWrittenToSink >= numRecords) {
break;
}
assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred");
Thread.sleep(1000);
}

SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName);
log.info("sink {} status {}", spec.sinkName, status);
assertEquals(status.getInstances().size(), 1);
assertTrue(status.getInstances().get(0).getStatus().numReadFromPulsar >= numRecords);
assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords);
assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0);
assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0);
});
log.info("sink {} is okay", spec.sinkName);
} finally {
dumpSinkLogs(spec);
}
}
}

Expand All @@ -129,6 +163,18 @@ public void testGenericObjectSink() throws Exception {
}
}

private void dumpSinkLogs(SinkSpec spec) {
try {
String logFile = "/pulsar/logs/functions/public/default/" + spec.sinkName + "/" + spec.sinkName + "-0.log";
String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
return IOUtils.toString(inputStream, "utf-8");
});
log.info("Sink {} logs {}", spec.sinkName, logs);
} catch (Throwable err) {
log.info("Cannot download sink {} logs", spec.sinkName, err);
}
}

private void submitSinkConnector(String sinkName,
String inputTopicName,
String className,
Expand Down Expand Up @@ -169,6 +215,8 @@ private void getSinkStatus(String sinkName) throws Exception {
"--namespace", "default",
"--name", sinkName
);
log.info(result.getStdout());
log.info(result.getStderr());
assertTrue(result.getStdout().contains("\"running\" : true"));
}

Expand Down

0 comments on commit 0f07dda

Please sign in to comment.