Skip to content

Commit

Permalink
[Schema] Support consume multiple schema types messages by AutoConsum…
Browse files Browse the repository at this point in the history
…eSchema (apache#10604)

Based on the PR apache#10573

### Motivation

Support consuming multiple schema types messages by AutoConsumeSchema.
  • Loading branch information
gaoran10 authored May 23, 2021
1 parent b6ca040 commit f5e10a9
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 90 deletions.
182 changes: 182 additions & 0 deletions pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.testng.Assert.fail;
import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Sets;

import java.nio.charset.StandardCharsets;
Expand All @@ -37,6 +38,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.Cleanup;
Expand All @@ -53,9 +56,11 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -781,4 +786,181 @@ public void testNullKey() throws Exception {
assertEquals("foo", message.getValue());
}

public void testConsumeMultipleSchemaMessages() throws Exception {
final String namespace = "test-namespace-" + randomName(16);
String ns = PUBLIC_TENANT + "/" + namespace;
admin.namespaces().createNamespace(ns, Sets.newHashSet(CLUSTER_NAME));
admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

final String autoProducerTopic = getTopicName(ns, "auto_produce_topic");
Producer<byte[]> autoProducer = pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
.topic(autoProducerTopic)
.create();

AtomicInteger totalMsgCnt = new AtomicInteger(0);
generateDataByDifferentSchema(ns, "bytes_schema", Schema.BYTES, "bytes value".getBytes(),
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "string_schema", Schema.STRING, "string value",
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "bool_schema", Schema.BOOL, true,
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "json_one_schema", Schema.JSON(Schemas.PersonOne.class),
new Schemas.PersonOne(1), autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "json_three_schema", Schema.JSON(Schemas.PersonThree.class),
new Schemas.PersonThree(3, "ran"), autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "json_four_schema", Schema.JSON(Schemas.PersonFour.class),
new Schemas.PersonFour(4, "tang", 18), autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "avro_one_schema", Schema.AVRO(Schemas.PersonOne.class),
new Schemas.PersonOne(10), autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "k_one_v_three_schema_separate",
Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
Schema.JSON(Schemas.PersonThree.class), KeyValueEncodingType.SEPARATED),
new KeyValue<>(new Schemas.PersonOne(1), new Schemas.PersonThree(3, "kv-separate")),
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "k_one_v_four_schema_inline",
Schema.KeyValue(Schema.JSON(Schemas.PersonOne.class),
Schema.JSON(Schemas.PersonFour.class), KeyValueEncodingType.INLINE),
new KeyValue<>(new Schemas.PersonOne(10), new Schemas.PersonFour(30, "kv-inline", 20)),
autoProducer, totalMsgCnt);
generateDataByDifferentSchema(ns, "k_int_v_three_schema_separate",
Schema.KeyValue(Schema.INT32, Schema.JSON(Schemas.PersonThree.class), KeyValueEncodingType.SEPARATED),
new KeyValue<>(100, new Schemas.PersonThree(40, "kv-separate")),
autoProducer, totalMsgCnt);

Consumer<GenericRecord> autoConsumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(autoProducerTopic)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Message<GenericRecord> message;
for (int i = 0; i < totalMsgCnt.get(); i++) {
message = autoConsumer.receive(5, TimeUnit.SECONDS);
if (message == null) {
Assert.fail("Failed to receive multiple schema message.");
}
log.info("auto consumer get native object class: {}, value: {}",
message.getValue().getNativeObject().getClass(), message.getValue().getNativeObject());
checkSchemaForAutoSchema(message);
}
}

private String getTopicName(String ns, String baseTopic) {
return ns + "/" + baseTopic;
}

private void generateDataByDifferentSchema(String ns,
String baseTopic,
Schema schema,
Object data,
Producer<?> autoProducer,
AtomicInteger totalMsgCnt) throws PulsarClientException {
String topic = getTopicName(ns, baseTopic);
Producer<Object> producer = pulsarClient.newProducer(schema)
.topic(topic)
.create();
producer.newMessage().value(data).property("baseTopic", baseTopic).send();
totalMsgCnt.incrementAndGet();

Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.topic(topic)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Message<GenericRecord> message = consumer.receive(5, TimeUnit.SECONDS);
if (message == null) {
Assert.fail("Failed to receive message for topic " + topic);
}
if (!message.getReaderSchema().isPresent()) {
Assert.fail("Failed to get reader schema for topic " + topic);
}
message.getValue();

Schema<?> readerSchema = message.getReaderSchema().get();
if (readerSchema instanceof KeyValueSchema
&& ((KeyValueSchema<?, ?>) readerSchema)
.getKeyValueEncodingType().equals(KeyValueEncodingType.SEPARATED)) {
autoProducer.newMessage(
Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())).keyBytes(message.getKeyBytes())
.value(message.getData())
.properties(message.getProperties())
.send();
} else {
autoProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.properties(message.getProperties())
.value(message.getData())
.send();
}
producer.close();
consumer.close();
}

private void checkSchemaForAutoSchema(Message<GenericRecord> message) {
if (!message.getReaderSchema().isPresent()) {
Assert.fail("Failed to get reader schema for auto consume multiple schema topic.");
}
Object nativeObject = message.getValue().getNativeObject();
String baseTopic = message.getProperty("baseTopic");
JsonNode jsonNode;
KeyValue<?, ?> kv;
switch (baseTopic) {
case "bytes_schema":
Assert.assertEquals(new String((byte[]) nativeObject), "bytes value");
break;
case "string_schema":
Assert.assertEquals((String) nativeObject, "string value");
break;
case "bool_schema":
Assert.assertEquals(nativeObject, Boolean.TRUE);
break;
case "json_one_schema":
jsonNode = (JsonNode) nativeObject;
Assert.assertEquals(jsonNode.get("id").intValue(), 1);
break;
case "json_three_schema":
jsonNode = (JsonNode) nativeObject;
Assert.assertEquals(jsonNode.get("id").intValue(), 3);
Assert.assertEquals(jsonNode.get("name").textValue(), "ran");
break;
case "json_four_schema":
jsonNode = (JsonNode) nativeObject;
Assert.assertEquals(jsonNode.get("id").intValue(), 4);
Assert.assertEquals(jsonNode.get("name").textValue(), "tang");
Assert.assertEquals(jsonNode.get("age").intValue(), 18);
break;
case "avro_one_schema":
org.apache.avro.generic.GenericRecord genericRecord =
(org.apache.avro.generic.GenericRecord) nativeObject;
Assert.assertEquals(genericRecord.get("id"), 10);
break;
case "k_one_v_three_schema_separate":
kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 1);
jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 3);
Assert.assertEquals(jsonNode.get("name").textValue(), "kv-separate");
break;
case "k_one_v_four_schema_inline":
kv = (KeyValue<GenericRecord, GenericRecord>) nativeObject;
jsonNode = ((GenericJsonRecord) kv.getKey()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 10);
jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 30);
Assert.assertEquals(jsonNode.get("name").textValue(), "kv-inline");
Assert.assertEquals(jsonNode.get("age").intValue(), 20);
break;
case "k_int_v_three_schema_separate":
kv = (KeyValue<Integer, GenericRecord>) nativeObject;
Assert.assertEquals(kv.getKey(), 100);
jsonNode = ((GenericJsonRecord) kv.getValue()).getJsonNode();
Assert.assertEquals(jsonNode.get("id").intValue(), 40);
Assert.assertEquals(jsonNode.get("name").textValue(), "kv-separate");
break;
default:
// nothing to do
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -52,6 +51,7 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -401,7 +401,7 @@ public Optional<Schema<?>> getReaderSchema() {
.atSchemaVersion(schemaVersion));
} else if (schema instanceof AbstractSchema) {
byte[] schemaVersion = getSchemaVersion();
return Optional.of(((AbstractSchema) schema)
return Optional.of(((AbstractSchema<?>) schema)
.atSchemaVersion(schemaVersion));
} else {
return Optional.of(schema);
Expand All @@ -419,11 +419,15 @@ public byte[] getSchemaVersion() {

private void ensureSchemaIsLoaded() {
if (schema instanceof AutoConsumeSchema) {
((AutoConsumeSchema) schema).fetchSchemaIfNeeded();
((AutoConsumeSchema) schema).fetchSchemaIfNeeded(BytesSchemaVersion.of(getSchemaVersion()));
}
}

private SchemaInfo getSchemaInfo() {
ensureSchemaIsLoaded();
if (schema instanceof AutoConsumeSchema) {
return ((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion());
}
return schema.getSchemaInfo();
}

Expand All @@ -449,7 +453,7 @@ public T getValue() {

private KeyValueSchema getKeyValueSchema() {
if (schema instanceof AutoConsumeSchema) {
return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema();
return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema(getSchemaVersion());
} else {
return (KeyValueSchema) schema;
}
Expand All @@ -476,7 +480,7 @@ private T getKeyValueBySchemaVersion() {
(org.apache.pulsar.common.schema.KeyValue) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
if (schema instanceof AutoConsumeSchema) {
return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
schema.getSchemaInfo().getType(), schemaVersion);
((AutoConsumeSchema) schema).getSchemaInfo(schemaVersion).getType(), schemaVersion);
} else {
return (T) keyValue;
}
Expand All @@ -492,7 +496,7 @@ private T getKeyValue() {
(org.apache.pulsar.common.schema.KeyValue) kvSchema.decode(getKeyBytes(), getData(), null);
if (schema instanceof AutoConsumeSchema) {
return (T) AutoConsumeSchema.wrapPrimitiveObject(keyValue,
schema.getSchemaInfo().getType(), null);
((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion()).getType(), null);
} else {
return (T) keyValue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,7 @@ private SchemaInfoProvider newSchemaProvider(String topicName) {
return new MultiVersionSchemaInfoProvider(TopicName.get(topicName), this);
}

private LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
protected LoadingCache<String, SchemaInfoProvider> getSchemaProviderLoadingCache() {
return schemaProviderLoadingCache;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public TypedMessageBuilder<T> key(String key) {

@Override
public TypedMessageBuilder<T> keyBytes(byte[] key) {
if (schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
if (schema instanceof KeyValueSchema && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
checkArgument(!(kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED),
"This method is not allowed to set keys when in encoding type is SEPARATED");
Expand All @@ -149,7 +149,8 @@ public TypedMessageBuilder<T> value(T value) {
msgMetadata.setNullValue(true);
return this;
}
if (schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
if (value instanceof org.apache.pulsar.common.schema.KeyValue
&& schema.getSchemaInfo() != null && schema.getSchemaInfo().getType() == SchemaType.KEY_VALUE) {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
org.apache.pulsar.common.schema.KeyValue kv = (org.apache.pulsar.common.schema.KeyValue) value;
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
Expand Down
Loading

0 comments on commit f5e10a9

Please sign in to comment.