Skip to content

Commit

Permalink
GenericObject - support KeyValue in Message#getValue() (apache#10107)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Apr 9, 2021
1 parent 6629b15 commit a5a94ed
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -301,6 +303,52 @@ public void testUseAutoConsumeWithSchemalessTopic() throws Exception {
consumer2.close();
}

@Test
public void testKeyValueSchema() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-string-schema";

final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();

admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));

admin.topics().createPartitionedTopic(topic, 2);

Producer<KeyValue<String, Integer>> producer = pulsarClient
.newProducer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE))
.topic(topic)
.create();

producer.send(new KeyValue<>("foo", 123));

Consumer<KeyValue<String, Integer>> consumer = pulsarClient.newConsumer(Schema.KeyValue(Schema.STRING, Schema.INT32, KeyValueEncodingType.INLINE))
.subscriptionName("test-sub")
.topic(topic)
.subscribe();

Consumer<GenericRecord> consumer2 = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test-sub2")
.topic(topic)
.subscribe();

producer.send(new KeyValue<>("foo", 123));

Message<KeyValue<String, Integer>> message = consumer.receive();
Message<GenericRecord> message2 = consumer2.receive();
assertEquals(message.getValue(), message2.getValue().getNativeObject());

producer.close();
consumer.close();
consumer2.close();
}

@Test
public void testIsUsingAvroSchemaParser() {
for (SchemaType value : SchemaType.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
Expand Down Expand Up @@ -359,8 +360,16 @@ public T getValue() {
}
}

private KeyValueSchema getKeyValueSchema() {
if (schema instanceof AutoConsumeSchema) {
return (KeyValueSchema) ((AutoConsumeSchema) schema).getInternalSchema();
} else {
return (KeyValueSchema) schema;
}
}

private T getKeyValueBySchemaVersion() {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
KeyValueSchema kvSchema = getKeyValueSchema();
byte[] schemaVersion = getSchemaVersion();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(getKeyBytes(), getData(), schemaVersion);
Expand All @@ -370,7 +379,7 @@ private T getKeyValueBySchemaVersion() {
}

private T getKeyValue() {
KeyValueSchema kvSchema = (KeyValueSchema) schema;
KeyValueSchema kvSchema = getKeyValueSchema();
if (kvSchema.getKeyValueEncodingType() == KeyValueEncodingType.SEPARATED) {
return (T) kvSchema.decode(getKeyBytes(), getData(), null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,4 +237,8 @@ protected GenericRecord adapt(Object value, byte[] schemaVersion) {
return GenericObjectWrapper.of(value,
this.schema.getSchemaInfo().getType(), schemaVersion);
}

public Schema<?> getInternalSchema() {
return schema;
}
}

0 comments on commit a5a94ed

Please sign in to comment.