diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java index df2b79b4207d2..e0c0ef14dda10 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java @@ -23,6 +23,7 @@ import java.util.Optional; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.EncryptionContext; public class TopicMessageImpl implements Message { @@ -175,4 +176,12 @@ public String getReplicatedFrom() { public Message getMessage() { return msg; } + + public Schema getSchema() { + if (this.msg instanceof MessageImpl) { + MessageImpl message = (MessageImpl) this.msg; + return message.getSchema(); + } + return null; + } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java index 9626c7703416a..118044b5ca92e 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java @@ -27,6 +27,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.functions.api.KVRecord; import org.apache.pulsar.functions.api.Record; @@ -100,7 +101,12 @@ public Schema getSchema() { } if (sourceRecord.getSchema() != null) { - return sourceRecord.getSchema(); + // unwrap actual schema + Schema schema = sourceRecord.getSchema(); + if (schema instanceof AutoConsumeSchema) { + schema = (Schema) ((AutoConsumeSchema) schema).getInternalSchema(); + } + return schema; } if (sourceRecord instanceof KVRecord) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 235c6ca9d75cd..e1ec9f69339f9 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.client.impl.TopicMessageImpl; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.common.functions.FunctionConfig; import org.apache.pulsar.common.util.Reflections; @@ -132,6 +133,9 @@ public void received(Consumer consumer, Message message) { if (message instanceof MessageImpl) { MessageImpl impl = (MessageImpl) message; schema = impl.getSchema(); + } else if (message instanceof TopicMessageImpl) { + TopicMessageImpl impl = (TopicMessageImpl) message; + schema = impl.getSchema(); } Record record = PulsarRecord.builder() .message(message) diff --git a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java index 6b0da0b353f6b..c0c4ac227afe3 100644 --- a/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java +++ b/tests/docker-images/java-test-functions/src/main/java/org/apache/pulsar/tests/integration/io/TestGenericObjectSink.java @@ -20,18 +20,13 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.client.impl.schema.KeyValueSchema; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaType; -import org.apache.pulsar.functions.api.KVRecord; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.SinkContext; -import org.apache.pulsar.io.core.Source; -import org.apache.pulsar.io.core.SourceContext; - -import java.util.HashMap; import java.util.Map; -import java.util.Optional; @Slf4j public class TestGenericObjectSink implements Sink { @@ -41,10 +36,32 @@ public void open(Map config, SinkContext sourceContext) throws E } public void write(Record record) { + + log.info("properties {}", record.getProperties()); log.info("received record {} {}", record, record.getClass()); log.info("schema {}", record.getSchema()); log.info("native schema {}", record.getSchema().getNativeSchema().orElse(null)); + String expectedRecordType = record.getProperties().getOrDefault("expectedType", "MISSING"); + if (!expectedRecordType.equals(record.getSchema().getSchemaInfo().getType().name())) { + throw new RuntimeException("Unexpected record type "+record.getSchema().getSchemaInfo().getType().name() +" is not "+expectedRecordType); + } + + log.info("value {}", record.getValue()); + log.info("value schema type {}", record.getValue().getSchemaType()); + log.info("value native object {}", record.getValue().getNativeObject()); + + if (record.getSchema().getSchemaInfo().getType() == SchemaType.KEY_VALUE) { + // assert that we are able to access the schema (leads to ClassCastException if there is a problem) + KeyValueSchema kvSchema = (KeyValueSchema) record.getSchema(); + log.info("key schema type {}", kvSchema.getKeySchema()); + log.info("value schema type {}", kvSchema.getValueSchema()); + log.info("key encoding {}", kvSchema.getKeyValueEncodingType()); + + KeyValue keyValue = (KeyValue) record.getValue().getNativeObject(); + log.info("kvkey {}", keyValue.getKey()); + log.info("kvvalue {}", keyValue.getValue()); + } log.info("value {}", record.getValue()); log.info("value schema type {}", record.getValue().getSchemaType()); log.info("value native object {}", record.getValue().getNativeObject()); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java index 25f177610fb47..1e5fb74bbef5a 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarGenericObjectSinkTest.java @@ -22,8 +22,10 @@ import lombok.Cleanup; import lombok.Data; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -36,8 +38,11 @@ import org.awaitility.Awaitility; import org.testng.annotations.Test; +import java.io.ByteArrayOutputStream; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; +import java.util.concurrent.TimeUnit; import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR; import static org.testng.Assert.assertEquals; @@ -71,16 +76,29 @@ public static final class Pojo { private int field2; } + @Data + @Builder + public static final class PojoKey { + private String field1; + } + @Test(groups = {"sink"}) public void testGenericObjectSink() throws Exception { + + @Cleanup PulsarClient client = PulsarClient.builder() + .serviceUrl(container.getPlainTextServiceUrl()) + .build(); + // we are not using a parametrized test in order to save resources // we create N sinks, send the records and verify each sink // sinks execution happens in parallel List specs = Arrays.asList( new SinkSpec("test-kv-sink-input-string-" + randomName(8), "test-kv-sink-string-" + randomName(8), Schema.STRING, "foo"), - new SinkSpec("test-kv-sink-input-int-" + randomName(8), "test-kv-sink-int-" + randomName(8), Schema.INT32, 123), new SinkSpec("test-kv-sink-input-avro-" + randomName(8), "test-kv-sink-avro-" + randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()), - new SinkSpec("test-kv-sink-input-json-" + randomName(8), "test-kv-sink-json-" + randomName(8), Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build()) + new SinkSpec("test-kv-sink-input-kv-string-int-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8), + Schema.KeyValue(Schema.STRING, Schema.INT32), new KeyValue<>("foo", 123)), + new SinkSpec("test-kv-sink-input-kv-avro-json-" + randomName(8), "test-kv-sink-input-kv-string-int-" + randomName(8), + Schema.KeyValue(Schema.AVRO(PojoKey.class), Schema.JSON(Pojo.class)), new KeyValue<>(PojoKey.builder().field1("a").build(), Pojo.builder().field1("a").field2(2).build())) ); // submit all sinks for (SinkSpec spec : specs) { @@ -93,10 +111,6 @@ public void testGenericObjectSink() throws Exception { getSinkStatus(spec.sinkName); } - @Cleanup PulsarClient client = PulsarClient.builder() - .serviceUrl(container.getPlainTextServiceUrl()) - .build(); - final int numRecords = 10; for (SinkSpec spec : specs) { @@ -104,21 +118,41 @@ public void testGenericObjectSink() throws Exception { .topic(spec.outputTopicName) .create(); for (int i = 0; i < numRecords; i++) { - producer.send(spec.testValue); + MessageId messageId = producer.newMessage() + .value(spec.testValue) + .property("expectedType", spec.schema.getSchemaInfo().getType().toString()) + .send(); + log.info("sent message {} {} with ID {}", spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId); } } // wait that all sinks processed all records without errors try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) { - for (SinkSpec spec : specs) { - Awaitility.await().ignoreExceptions().untilAsserted(() -> { + try { + log.info("waiting for sink {}", spec.sinkName); + for (int i = 0; i < 120; i++) { + SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName); + log.info("sink {} status {}", spec.sinkName, status); + assertEquals(status.getInstances().size(), 1); + SinkStatus.SinkInstanceStatus instance = status.getInstances().get(0); + if (instance.getStatus().numWrittenToSink >= numRecords) { + break; + } + assertTrue(instance.getStatus().numRestarts > 1, "Sink was restarted, probably an error occurred"); + Thread.sleep(1000); + } + SinkStatus status = admin.sinks().getSinkStatus("public", "default", spec.sinkName); + log.info("sink {} status {}", spec.sinkName, status); assertEquals(status.getInstances().size(), 1); - assertTrue(status.getInstances().get(0).getStatus().numReadFromPulsar >= numRecords); + assertTrue(status.getInstances().get(0).getStatus().numWrittenToSink >= numRecords); assertTrue(status.getInstances().get(0).getStatus().numSinkExceptions == 0); assertTrue(status.getInstances().get(0).getStatus().numSystemExceptions == 0); - }); + log.info("sink {} is okay", spec.sinkName); + } finally { + dumpSinkLogs(spec); + } } } @@ -129,6 +163,18 @@ public void testGenericObjectSink() throws Exception { } } + private void dumpSinkLogs(SinkSpec spec) { + try { + String logFile = "/pulsar/logs/functions/public/default/" + spec.sinkName + "/" + spec.sinkName + "-0.log"; + String logs = container.copyFileFromContainer(logFile, (inputStream) -> { + return IOUtils.toString(inputStream, "utf-8"); + }); + log.info("Sink {} logs {}", spec.sinkName, logs); + } catch (Throwable err) { + log.info("Cannot download sink {} logs", spec.sinkName, err); + } + } + private void submitSinkConnector(String sinkName, String inputTopicName, String className, @@ -169,6 +215,8 @@ private void getSinkStatus(String sinkName) throws Exception { "--namespace", "default", "--name", sinkName ); + log.info(result.getStdout()); + log.info(result.getStderr()); assertTrue(result.getStdout().contains("\"running\" : true")); }