From 8003d08e5ca325867d2e825921f18ddda8d4e1d4 Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Fri, 6 Mar 2020 14:28:30 +0800 Subject: [PATCH] Independent schema is set for each consumer generated by topic (#6356) ### Motivation Master Issue: #5454 When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic. ### Modification clone schema for each consumer generated by topic. ### Verifying this change Add the schemaTest for it. --- .../JsonSchemaCompatibilityCheckTest.java | 5 + .../org/apache/pulsar/schema/SchemaTest.java | 136 ++++++++++++++++++ .../schema/{compatibility => }/Schemas.java | 2 +- .../SchemaCompatibilityCheckTest.java | 22 +-- .../org/apache/pulsar/client/api/Schema.java | 9 +- .../kafka/compat/PulsarKafkaSchema.java | 5 + .../kafka/compat/PulsarKafkaSchema.java | 5 + .../client/impl/MultiTopicsConsumerImpl.java | 7 +- .../pulsar/client/impl/PulsarClientImpl.java | 21 +-- .../client/impl/schema/AbstractSchema.java | 5 + .../client/impl/schema/AutoConsumeSchema.java | 14 ++ .../impl/schema/AutoProduceBytesSchema.java | 5 + .../pulsar/client/impl/schema/AvroSchema.java | 10 ++ .../client/impl/schema/KeyValueSchema.java | 5 + .../impl/schema/KeyValueSchemaInfo.java | 5 + .../schema/generic/GenericAvroSchema.java | 10 ++ .../schema/generic/GenericSchemaImpl.java | 1 - .../pulsar/functions/source/SerDeSchema.java | 4 + 18 files changed, 244 insertions(+), 27 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java rename pulsar-broker/src/test/java/org/apache/pulsar/schema/{compatibility => }/Schemas.java (96%) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java index 09a0c44fbf312..7befab0942f26 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/schema/JsonSchemaCompatibilityCheckTest.java @@ -124,5 +124,10 @@ public static OldJSONSchema of(Class pojo, Map propert info.setSchema(mapper.writeValueAsBytes(schema)); return new OldJSONSchema<>(info, pojo, mapper); } + + @Override + public Schema clone() { + return this; + } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java new file mode 100644 index 0000000000000..eba8ec074411c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.schema; + +import com.google.common.collect.Sets; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Collections; + +import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; +import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; +import static org.junit.Assert.assertEquals; + +public class SchemaTest extends MockedPulsarServiceBaseTest { + + private final static String CLUSTER_NAME = "test"; + + @BeforeMethod + @Override + public void setup() throws Exception { + super.internalSetup(); + + // Setup namespaces + admin.clusters().createCluster(CLUSTER_NAME, new ClusterData(pulsar.getBrokerServiceUrl())); + TenantInfo tenantInfo = new TenantInfo(); + tenantInfo.setAllowedClusters(Collections.singleton(CLUSTER_NAME)); + admin.tenants().createTenant(PUBLIC_TENANT, tenantInfo); + } + + @AfterMethod + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testMultiTopicSetSchemaProvider() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicOne = "test-multi-version-schema-one"; + final String topicTwo = "test-multi-version-schema-two"; + final String fqtnOne = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topicOne + ).toString(); + + final String fqtnTwo = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topicTwo + ).toString(); + + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME) + ); + + admin.topics().createPartitionedTopic(fqtnOne, 3); + admin.topics().createPartitionedTopic(fqtnTwo, 3); + + admin.schemas().createSchema(fqtnOne, Schema.AVRO( + SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonOne.class).build()).getSchemaInfo()); + + admin.schemas().createSchema(fqtnOne, Schema.AVRO( + SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo()); + + admin.schemas().createSchema(fqtnTwo, Schema.AVRO( + SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo()); + + Producer producer = pulsarClient.newProducer(Schema.AVRO( + SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build())) + .topic(fqtnOne) + .create(); + + Schemas.PersonTwo personTwo = new Schemas.PersonTwo(); + personTwo.setId(1); + personTwo.setName("Tom"); + + + Consumer consumer = pulsarClient.newConsumer(Schema.AVRO( + SchemaDefinition.builder().withAlwaysAllowNull + (false).withSupportSchemaVersioning(true). + withPojo(Schemas.PersonTwo.class).build())) + .subscriptionName("test") + .topic(fqtnOne, fqtnTwo) + .subscribe(); + + producer.send(personTwo); + + Schemas.PersonTwo personConsume = consumer.receive().getValue(); + assertEquals("Tom", personConsume.getName()); + assertEquals(1, personConsume.getId()); + + producer.close(); + consumer.close(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/Schemas.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java similarity index 96% rename from pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/Schemas.java rename to pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java index 0978547b9d1e9..df02574bf8afc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/Schemas.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.schema.compatibility; +package org.apache.pulsar.schema; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 4903a68cbb496..f94d28b92854c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -35,6 +35,7 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.schema.Schemas; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -46,7 +47,6 @@ import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; @Slf4j @@ -141,7 +141,7 @@ public void testConsumerCompatibilityCheckCanReadLastTest(SchemaCompatibilityStr Schemas.PersonOne personOne = new Schemas.PersonOne(); - personOne.id = 1; + personOne.setId(1); producerOne.send(personOne); Message message = null; @@ -162,16 +162,16 @@ public void testConsumerCompatibilityCheckCanReadLastTest(SchemaCompatibilityStr .create(); Schemas.PersonTwo personTwo = new Schemas.PersonTwo(); - personTwo.id = 1; - personTwo.name = "Jerry"; + personTwo.setId(1); + personTwo.setName("Jerry"); producerTwo.send(personTwo); message = consumerThree.receive(); Schemas.PersonThree personThree = message.getValue(); consumerThree.acknowledge(message); - assertEquals(personThree.id, 1); - assertEquals(personThree.name, "Jerry"); + assertEquals(personThree.getId(), 1); + assertEquals(personThree.getName(), "Jerry"); consumerThree.close(); producerOne.close(); @@ -270,8 +270,8 @@ public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibili Schemas.PersonTwo personTwo = message.getValue(); consumerTwo.acknowledge(message); - assertEquals(personTwo.id, 2); - assertEquals(personTwo.name, "Lucy"); + assertEquals(personTwo.getId(), 2); + assertEquals(personTwo.getName(), "Lucy"); producer.close(); consumerTwo.close(); @@ -287,8 +287,8 @@ public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibili personTwo = message.getValue(); consumerTwo.acknowledge(message); - assertEquals(personTwo.id, 2); - assertEquals(personTwo.name, "Lucy"); + assertEquals(personTwo.getId(), 2); + assertEquals(personTwo.getName(), "Lucy"); consumerTwo.close(); producer.close(); @@ -338,7 +338,7 @@ public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityS Message message = consumerOne.receive(); personOne = message.getValue(); - assertEquals(10, personOne.id); + assertEquals(10, personOne.getId()); consumerOne.close(); producerOne.close(); diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java index bb0e85a33c754..0c871b35b03af 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Schema.java @@ -35,7 +35,7 @@ /** * Message schema definition. */ -public interface Schema { +public interface Schema extends Cloneable{ /** * Check if the message is a valid object for this schema. @@ -136,6 +136,13 @@ default void configureSchemaInfo(String topic, String componentName, // no-op } + /** + * Duplicates the schema. + * + * @return The duplicated schema. + */ + Schema clone(); + /** * Schema that doesn't perform any encoding on the message payloads. Accepts a byte array and it passes it through. */ diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java index 807f482dd6c56..aef6dd16d7d09 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java @@ -74,4 +74,9 @@ public T decode(byte[] message) { public SchemaInfo getSchemaInfo() { return Schema.BYTES.getSchemaInfo(); } + + @Override + public Schema clone() { + return this; + } } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java index 807f482dd6c56..aef6dd16d7d09 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java @@ -74,4 +74,9 @@ public T decode(byte[] message) { public SchemaInfo getSchemaInfo() { return Schema.BYTES.getSchemaInfo(); } + + @Override + public Schema clone() { + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 5c2609573aa78..970e1343c3adb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -800,16 +800,17 @@ private CompletableFuture subscribeAsync(String topicName, int numberParti private void subscribeTopicPartitions(CompletableFuture subscribeResult, String topicName, int numPartitions, boolean createIfDoesNotExist) { - client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> { + client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((schema, cause) -> { if (null == cause) { - doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions, createIfDoesNotExist); + doSubscribeTopicPartitions(schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist); } else { subscribeResult.completeExceptionally(cause); } }); } - private void doSubscribeTopicPartitions(CompletableFuture subscribeResult, String topicName, int numPartitions, + private void doSubscribeTopicPartitions(Schema schema, + CompletableFuture subscribeResult, String topicName, int numPartitions, boolean createIfDoesNotExist) { if (log.isDebugEnabled()) { log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 2fe8201aeb7d5..27a158d4813fd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -335,7 +335,7 @@ public CompletableFuture> subscribeAsync(ConsumerConfigurationDa private CompletableFuture> singleTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { return preProcessSchemaBeforeSubscribe(this, schema, conf.getSingleTopic()) - .thenCompose(ignored -> doSingleTopicSubscribeAsync(conf, schema, interceptors)); + .thenCompose(schemaClone -> doSingleTopicSubscribeAsync(conf, schemaClone, interceptors)); } private CompletableFuture> doSingleTopicSubscribeAsync(ConsumerConfigurationData conf, Schema schema, ConsumerInterceptors interceptors) { @@ -448,7 +448,7 @@ public CompletableFuture> createReaderAsync(ReaderConfigurationDa public CompletableFuture> createReaderAsync(ReaderConfigurationData conf, Schema schema) { return preProcessSchemaBeforeSubscribe(this, schema, conf.getTopicName()) - .thenCompose(ignored -> doCreateReaderAsync(conf, schema)); + .thenCompose(schemaClone -> doCreateReaderAsync(conf, schemaClone)); } CompletableFuture> doCreateReaderAsync(ReaderConfigurationData conf, Schema schema) { @@ -768,8 +768,8 @@ private LoadingCache getSchemaProviderLoadingCache() } @SuppressWarnings("unchecked") - protected CompletableFuture preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, - Schema schema, + protected CompletableFuture> preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl, + Schema schema, String topicName) { if (schema != null && schema.supportSchemaVersioning()) { final SchemaInfoProvider schemaInfoProvider; @@ -779,11 +779,12 @@ protected CompletableFuture preProcessSchemaBeforeSubscribe(PulsarClientIm log.error("Failed to load schema info provider for topic {}", topicName, e); return FutureUtil.failedFuture(e.getCause()); } - + schema = schema.clone(); if (schema.requireFetchingSchemaInfo()) { + Schema finalSchema = schema; return schemaInfoProvider.getLatestSchema().thenCompose(schemaInfo -> { if (null == schemaInfo) { - if (!(schema instanceof AutoConsumeSchema)) { + if (!(finalSchema instanceof AutoConsumeSchema)) { // no schema info is found return FutureUtil.failedFuture( new PulsarClientException.NotFoundException( @@ -792,18 +793,18 @@ protected CompletableFuture preProcessSchemaBeforeSubscribe(PulsarClientIm } try { log.info("Configuring schema for topic {} : {}", topicName, schemaInfo); - schema.configureSchemaInfo(topicName, "topic", schemaInfo); + finalSchema.configureSchemaInfo(topicName, "topic", schemaInfo); } catch (RuntimeException re) { return FutureUtil.failedFuture(re); } - schema.setSchemaInfoProvider(schemaInfoProvider); - return CompletableFuture.completedFuture(null); + finalSchema.setSchemaInfoProvider(schemaInfoProvider); + return CompletableFuture.completedFuture(finalSchema); }); } else { schema.setSchemaInfoProvider(schemaInfoProvider); } } - return CompletableFuture.completedFuture(null); + return CompletableFuture.completedFuture(schema); } // diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java index f459d5cdc4dd7..10843281ef8c0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AbstractSchema.java @@ -61,4 +61,9 @@ T decode(ByteBuf byteBuf, byte[] schemaVersion) { // ignore version by default (most of the primitive schema implementations ignore schema version) return decode(byteBuf); } + + @Override + public Schema clone() { + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java index 27e8e6e7aa085..049b0f55e5b98 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoConsumeSchema.java @@ -132,6 +132,20 @@ public void configureSchemaInfo(String topicName, } } + @Override + public Schema clone() { + Schema schema = Schema.AUTO_CONSUME(); + if (this.schema != null) { + schema.configureSchemaInfo(topicName, componentName, this.schema.getSchemaInfo()); + } else { + schema.configureSchemaInfo(topicName, componentName, null); + } + if (schemaInfoProvider != null) { + schema.setSchemaInfoProvider(schemaInfoProvider); + } + return schema; + } + private GenericSchema generateSchema(SchemaInfo schemaInfo) { if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java index dd5193f7a3519..7578ffafa4483 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AutoProduceBytesSchema.java @@ -95,4 +95,9 @@ public SchemaInfo getSchemaInfo() { return schema.getSchemaInfo(); } + + @Override + public Schema clone() { + return new AutoProduceBytesSchema<>(schema.clone()); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java index f4d130f5ba03b..fe801dd1ea48e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/AvroSchema.java @@ -22,6 +22,7 @@ import org.apache.avro.Conversions; import org.apache.avro.data.TimeConversions; import org.apache.avro.reflect.ReflectData; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.client.api.schema.SchemaReader; import org.apache.pulsar.client.impl.schema.reader.AvroReader; @@ -75,6 +76,15 @@ public boolean supportSchemaVersioning() { return true; } + @Override + public Schema clone() { + Schema schema = new AvroSchema<>(schemaInfo); + if (schemaInfoProvider != null) { + schema.setSchemaInfoProvider(schemaInfoProvider); + } + return schema; + } + public static AvroSchema of(SchemaDefinition schemaDefinition) { return new AvroSchema<>(parseSchemaInfo(schemaDefinition, SchemaType.AVRO)); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java index 0e173d21d158f..b81a94706bbc8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchema.java @@ -193,6 +193,11 @@ public void configureSchemaInfo(String topicName, } } + @Override + public Schema> clone() { + return KeyValueSchema.of(keySchema.clone(), valueSchema.clone(), keyValueEncodingType); + } + private void configureKeyValueSchemaInfo() { this.schemaInfo = KeyValueSchemaInfo.encodeKeyValueSchemaInfo( keySchema, valueSchema, keyValueEncodingType diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java index 120f5a75bb661..95735261f494b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/KeyValueSchemaInfo.java @@ -48,6 +48,11 @@ public byte[] encode(SchemaInfo si) { public SchemaInfo getSchemaInfo() { return BytesSchema.BYTES.getSchemaInfo(); } + + @Override + public Schema clone() { + return this; + } }; private static final String KEY_SCHEMA_NAME = "key.schema.name"; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java index 91220b4221ca7..98e646ea2ba50 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericAvroSchema.java @@ -54,6 +54,16 @@ public boolean supportSchemaVersioning() { return true; } + @Override + public org.apache.pulsar.client.api.Schema clone() { + org.apache.pulsar.client.api.Schema schema = + GenericAvroSchema.of(schemaInfo, useProvidedSchemaAsReaderSchema); + if (schemaInfoProvider != null) { + schema.setSchemaInfoProvider(schemaInfoProvider); + } + return schema; + } + @Override protected SchemaReader loadReader(BytesSchemaVersion schemaVersion) { SchemaInfo schemaInfo = getSchemaInfoByVersion(schemaVersion.get()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java index f22c449bfe80c..7d18e52979a30 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericSchemaImpl.java @@ -76,5 +76,4 @@ public static GenericSchemaImpl of(SchemaInfo schemaInfo, + schemaInfo.getType() + "'"); } } - } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java index 8d4bf1ff39a85..c1adfcc4b80c5 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SerDeSchema.java @@ -49,4 +49,8 @@ public SchemaInfo getSchemaInfo() { return null; } + @Override + public Schema clone() { + return this; + } }