Skip to content

Commit

Permalink
Make KeyValueSchema an interface visible in the public Schema API (ap…
Browse files Browse the repository at this point in the history
…ache#10888)

* Make KeyValueSchema an interface visible in the public Schema API
- allow users of pulsar-client-api to use KeyValueSchema
- move KeyValueSchema implementation to KeyValueSchemaImpl
- introduce a new interface KeyValueSchema
  • Loading branch information
eolivelli authored Jun 10, 2021
1 parent c75d45b commit 18f2f4a
Show file tree
Hide file tree
Showing 23 changed files with 250 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,23 @@

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.concurrent.CompletableFuture;

import static org.testng.Assert.assertEquals;

/**
Expand Down Expand Up @@ -83,15 +73,15 @@ public void keyValueAutoConsumeTest() throws Exception {

@Cleanup
Producer<KeyValue<GenericRecord, GenericRecord>> producer = pulsarClient
.newProducer(KeyValueSchema.of(schema, schema))
.newProducer(KeyValueSchemaImpl.of(schema, schema))
.topic(topic)
.create();

producer.newMessage().value(new KeyValue<>(key, value)).send();

@Cleanup
Consumer<KeyValue<GenericRecord, GenericRecord>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME()))
.newConsumer(KeyValueSchemaImpl.of(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME()))
.topic(topic)
.subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.schema.KeyValueSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.testng.Assert;
Expand Down Expand Up @@ -173,14 +173,14 @@ public void keyValueNullInlineTest(String topic, int partitions)

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.newProducer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING))
.topic(topic)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

@Cleanup
Consumer<KeyValue<String, String>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING))
.newConsumer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING))
.topic(topic)
.subscriptionName("test")
.subscribe();
Expand Down Expand Up @@ -220,7 +220,7 @@ public void keyValueNullSeparatedTest(String topic, int partitions)

@Cleanup
Producer<KeyValue<String, String>> producer = pulsarClient
.newProducer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.newProducer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
// The default SinglePartition routing mode will be affected by the key when the KeyValueEncodingType is
// SEPARATED so we need to define a message router to guarantee the message order.
Expand All @@ -234,7 +234,7 @@ public int choosePartition(Message<?> msg, TopicMetadata metadata) {

@Cleanup
Consumer<KeyValue<String, String>> consumer = pulsarClient
.newConsumer(KeyValueSchema.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.newConsumer(KeyValueSchemaImpl.of(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED))
.topic(topic)
.subscriptionName("test")
.subscribe();
Expand Down
Loading

0 comments on commit 18f2f4a

Please sign in to comment.