From 04f8c96a6c0c1c93cd495f46fb33d6e44d6004ea Mon Sep 17 00:00:00 2001 From: lipenghui Date: Mon, 26 Apr 2021 08:27:44 +0800 Subject: [PATCH] Fix schema type check issue when use always compatible strategy (#10367) Related to #9797 ### Motivation Fix schema type check issue when use always compatible strategy. 1. For non-transitive strategy, only check schema type for the last schema 2. For transitive strategy, check all schema's type 3. Get schema by schema data should consider different schema types --- .../schema/SchemaRegistryServiceImpl.java | 76 +++--- .../SchemaTypeCompatibilityCheckTest.java | 225 ++++-------------- 2 files changed, 92 insertions(+), 209 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java index b06531acbcdcb..f3afd3dde6e5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/SchemaRegistryServiceImpl.java @@ -140,15 +140,6 @@ public CompletableFuture putSchemaIfAbsent(String schemaId, Schem SchemaCompatibilityStrategy strategy) { return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList -> getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> { - if (strategy != SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE && schemaAndMetadataList.size() > 0) { - for (SchemaAndMetadata metadata : schemaAndMetadataList) { - if (schema.getType() != metadata.schema.getType()) { - return FutureUtil.failedFuture(new IncompatibleSchemaException( - String.format("Incompatible schema: exists schema type %s, new schema type %s", - metadata.schema.getType(), schema.getType()))); - } - } - } if (schemaVersion != null) { return CompletableFuture.completedFuture(schemaVersion); } @@ -299,6 +290,9 @@ public CompletableFuture checkConsumerCompatibility(String schemaId, Schem public CompletableFuture getSchemaVersionBySchemaData( List schemaAndMetadataList, SchemaData schemaData) { + if (schemaAndMetadataList == null || schemaAndMetadataList.size() == 0) { + return CompletableFuture.completedFuture(null); + } final CompletableFuture completableFuture = new CompletableFuture<>(); SchemaVersion schemaVersion; if (isUsingAvroSchemaParser(schemaData.getType())) { @@ -309,14 +303,15 @@ public CompletableFuture getSchemaVersionBySchemaData( if (isUsingAvroSchemaParser(schemaData.getType())) { Schema.Parser existParser = new Schema.Parser(); Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8)); - if (newSchema.equals(existSchema)) { + if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) { schemaVersion = schemaAndMetadata.version; completableFuture.complete(schemaVersion); return completableFuture; } } else { if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(), - hashFunction.hashBytes(schemaData.getData()).asBytes())) { + hashFunction.hashBytes(schemaData.getData()).asBytes()) + && schemaAndMetadata.schema.getType() == schemaData.getType()) { schemaVersion = schemaAndMetadata.version; completableFuture.complete(schemaVersion); return completableFuture; @@ -326,7 +321,8 @@ public CompletableFuture getSchemaVersionBySchemaData( } else { for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) { if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(), - hashFunction.hashBytes(schemaData.getData()).asBytes())) { + hashFunction.hashBytes(schemaData.getData()).asBytes()) + && schemaAndMetadata.schema.getType() == schemaData.getType()) { schemaVersion = schemaAndMetadata.version; completableFuture.complete(schemaVersion); return completableFuture; @@ -339,14 +335,23 @@ public CompletableFuture getSchemaVersionBySchemaData( private CompletableFuture checkCompatibilityWithLatest(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) { + if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) { + return CompletableFuture.completedFuture(null); + } return getSchema(schemaId).thenCompose(existingSchema -> { if (existingSchema != null && !existingSchema.schema.isDeleted()) { CompletableFuture result = new CompletableFuture<>(); - try { - checkCompatible(existingSchema, schema, strategy); - result.complete(null); - } catch (IncompatibleSchemaException e) { - result.completeExceptionally(e); + if (existingSchema.schema.getType() != schema.getType()) { + result.completeExceptionally(new IncompatibleSchemaException( + String.format("Incompatible schema: exists schema type %s, new schema type %s", + existingSchema.schema.getType(), schema.getType()))); + } else { + try { + checkCompatible(existingSchema, schema, strategy); + result.complete(null); + } catch (IncompatibleSchemaException e) { + result.completeExceptionally(e); + } } return result; } else { @@ -366,18 +371,35 @@ private CompletableFuture checkCompatibilityWithAll(SchemaData schema, SchemaCompatibilityStrategy strategy, List schemaAndMetadataList) { CompletableFuture result = new CompletableFuture<>(); - try { - compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT) - .checkCompatible(schemaAndMetadataList - .stream() - .map(schemaAndMetadata -> schemaAndMetadata.schema) - .collect(Collectors.toList()), schema, strategy); + if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) { result.complete(null); - } catch (Exception e) { - if (e instanceof IncompatibleSchemaException) { - result.completeExceptionally(e); + } else { + SchemaAndMetadata breakSchema = null; + for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) { + if (schemaAndMetadata.schema.getType() != schema.getType()) { + breakSchema = schemaAndMetadata; + break; + } + } + if (breakSchema == null) { + try { + compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT) + .checkCompatible(schemaAndMetadataList + .stream() + .map(schemaAndMetadata -> schemaAndMetadata.schema) + .collect(Collectors.toList()), schema, strategy); + result.complete(null); + } catch (Exception e) { + if (e instanceof IncompatibleSchemaException) { + result.completeExceptionally(e); + } else { + result.completeExceptionally(new IncompatibleSchemaException(e)); + } + } } else { - result.completeExceptionally(new IncompatibleSchemaException(e)); + result.completeExceptionally(new IncompatibleSchemaException( + String.format("Incompatible schema: exists schema type %s, new schema type %s", + breakSchema.schema.getType(), schema.getType()))); } } return result; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java index c24822f7ae759..bc711c567aa9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaTypeCompatibilityCheckTest.java @@ -30,14 +30,18 @@ import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.schema.Schemas; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.util.Collections; +import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.expectThrows; @@ -172,101 +176,6 @@ public void structTypeConsumerConsumerUndefinedCompatible() throws Exception { assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO")); } - @Test - public void structTypeProducerProducerAlwaysCompatible() throws Exception { - admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); - - String topicName = TopicName.get( - TopicDomain.persistent.value(), - PUBLIC_TENANT, - namespace, - "structTypeProducerProducerAlwaysCompatible" - ).toString(); - - pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class)) - .topic(topicName) - .create(); - - pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topicName) - .create(); - } - - @Test - public void structTypeProducerConsumerAlwaysCompatible() throws Exception { - admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); - - final String subName = "my-sub"; - String topicName = TopicName.get( - TopicDomain.persistent.value(), - PUBLIC_TENANT, - namespace, - "structTypeProducerConsumerAlwaysCompatible" - ).toString(); - - pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class)) - .topic(topicName) - .create(); - - pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topicName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName) - .subscribe(); - } - - @Test - public void structTypeConsumerProducerAlwaysCompatible() throws Exception { - admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); - - final String subName = "my-sub"; - String topicName = TopicName.get( - TopicDomain.persistent.value(), - PUBLIC_TENANT, - namespace, - "structTypeConsumerProducerAlwaysCompatible" - ).toString(); - - pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class)) - .topic(topicName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName) - .subscribe(); - - pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topicName) - .create(); - } - - @Test - public void structTypeConsumerConsumerAlwaysCompatible() throws Exception { - admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); - - final String subName = "my-sub"; - String topicName = TopicName.get( - TopicDomain.persistent.value(), - PUBLIC_TENANT, - namespace, - "structTypeConsumerConsumerAlwaysCompatible" - ).toString(); - - pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class)) - .topic(topicName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName + "1") - .subscribe(); - - pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class)) - .topic(topicName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName + "2") - .subscribe(); - } - @Test public void primitiveTypeProducerProducerUndefinedCompatible() throws Exception { admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED); @@ -371,98 +280,50 @@ public void primitiveTypeConsumerConsumerUndefinedCompatible() throws Exception } @Test - public void primitiveTypeProducerProducerAlwaysCompatible() throws Exception { + public void testAlwaysCompatible() throws Exception { admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); - - String topicName = TopicName.get( + final String topicName = TopicName.get( TopicDomain.persistent.value(), PUBLIC_TENANT, namespace, - "primitiveTypeProducerProducerAlwaysCompatible" + "testAlwaysCompatible" + UUID.randomUUID().toString() ).toString(); - - pulsarClient.newProducer(Schema.INT32) - .topic(topicName) - .create(); - - pulsarClient.newProducer(Schema.STRING) - .topic(topicName) - .create(); - } - - @Test - public void primitiveTypeProducerConsumerAlwaysCompatible() throws Exception { - admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); - - final String subName = "my-sub"; - String topicName = TopicName.get( - TopicDomain.persistent.value(), - PUBLIC_TENANT, - namespace, - "primitiveTypeProducerConsumerAlwaysCompatible" - ).toString(); - - pulsarClient.newProducer(Schema.INT32) - .topic(topicName) - .create(); - - pulsarClient.newConsumer(Schema.STRING) - .topic(topicName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName + "2") - .subscribe(); - } - - @Test - public void primitiveTypeConsumerProducerAlwaysCompatible() throws Exception { - admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); - - final String subName = "my-sub"; - String topicName = TopicName.get( - TopicDomain.persistent.value(), - PUBLIC_TENANT, - namespace, - "primitiveTypeConsumerProducerAlwaysCompatible" - ).toString(); - - pulsarClient.newConsumer(Schema.INT32) - .topic(topicName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName) - .subscribe(); - - pulsarClient.newProducer(Schema.STRING) - .topic(topicName) - .create(); - } - - @Test - public void primitiveTypeConsumerConsumerAlwaysCompatible() throws Exception { - admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE); - - final String subName = "my-sub"; - String topicName = TopicName.get( - TopicDomain.persistent.value(), - PUBLIC_TENANT, - namespace, - "primitiveTypeConsumerConsumerAlwaysCompatible" - ).toString(); - - pulsarClient.newConsumer(Schema.INT32) - .topic(topicName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName + "1") - .subscribe(); - - pulsarClient.newConsumer(Schema.STRING) - .topic(topicName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(1, TimeUnit.SECONDS) - .subscriptionName(subName + "2") - .subscribe(); + Schema[] schemas = new Schema[] { + Schema.AVRO(Schemas.PersonOne.class), + Schema.AVRO(Schemas.PersonFour.class), + Schema.JSON(Schemas.PersonOne.class), + Schema.JSON(Schemas.PersonFour.class), + Schema.INT8, + Schema.INT16, + Schema.INT32, + Schema.INT64, + Schema.DATE, + Schema.BOOL, + Schema.DOUBLE, + Schema.STRING, + Schema.BYTES, + Schema.FLOAT, + Schema.INSTANT, + Schema.BYTEBUFFER, + Schema.TIME, + Schema.TIMESTAMP, + Schema.LOCAL_DATE, + Schema.LOCAL_DATE_TIME, + Schema.LOCAL_TIME + }; + + for (Schema schema : schemas) { + pulsarClient.newProducer(schema) + .topic(topicName) + .create(); + } + + for (Schema schema : schemas) { + pulsarClient.newConsumer(schema) + .topic(topicName) + .subscriptionName(UUID.randomUUID().toString()) + .subscribe(); + } } }