Skip to content

Commit

Permalink
Fix schema type check issue when use always compatible strategy (apac…
Browse files Browse the repository at this point in the history
…he#10367)

Related to apache#9797

### Motivation

Fix schema type check issue when use always compatible strategy.

1. For non-transitive strategy, only check schema type for the last schema
2. For transitive strategy, check all schema's type
3. Get schema by schema data should consider different schema types
  • Loading branch information
codelipenghui authored Apr 26, 2021
1 parent 306a448 commit 04f8c96
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,6 @@ public CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, Schem
SchemaCompatibilityStrategy strategy) {
return trimDeletedSchemaAndGetList(schemaId).thenCompose(schemaAndMetadataList ->
getSchemaVersionBySchemaData(schemaAndMetadataList, schema).thenCompose(schemaVersion -> {
if (strategy != SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE && schemaAndMetadataList.size() > 0) {
for (SchemaAndMetadata metadata : schemaAndMetadataList) {
if (schema.getType() != metadata.schema.getType()) {
return FutureUtil.failedFuture(new IncompatibleSchemaException(
String.format("Incompatible schema: exists schema type %s, new schema type %s",
metadata.schema.getType(), schema.getType())));
}
}
}
if (schemaVersion != null) {
return CompletableFuture.completedFuture(schemaVersion);
}
Expand Down Expand Up @@ -299,6 +290,9 @@ public CompletableFuture<Void> checkConsumerCompatibility(String schemaId, Schem
public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(
List<SchemaAndMetadata> schemaAndMetadataList,
SchemaData schemaData) {
if (schemaAndMetadataList == null || schemaAndMetadataList.size() == 0) {
return CompletableFuture.completedFuture(null);
}
final CompletableFuture<SchemaVersion> completableFuture = new CompletableFuture<>();
SchemaVersion schemaVersion;
if (isUsingAvroSchemaParser(schemaData.getType())) {
Expand All @@ -309,14 +303,15 @@ public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(
if (isUsingAvroSchemaParser(schemaData.getType())) {
Schema.Parser existParser = new Schema.Parser();
Schema existSchema = existParser.parse(new String(schemaAndMetadata.schema.getData(), UTF_8));
if (newSchema.equals(existSchema)) {
if (newSchema.equals(existSchema) && schemaAndMetadata.schema.getType() == schemaData.getType()) {
schemaVersion = schemaAndMetadata.version;
completableFuture.complete(schemaVersion);
return completableFuture;
}
} else {
if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
hashFunction.hashBytes(schemaData.getData()).asBytes())) {
hashFunction.hashBytes(schemaData.getData()).asBytes())
&& schemaAndMetadata.schema.getType() == schemaData.getType()) {
schemaVersion = schemaAndMetadata.version;
completableFuture.complete(schemaVersion);
return completableFuture;
Expand All @@ -326,7 +321,8 @@ public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(
} else {
for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
if (Arrays.equals(hashFunction.hashBytes(schemaAndMetadata.schema.getData()).asBytes(),
hashFunction.hashBytes(schemaData.getData()).asBytes())) {
hashFunction.hashBytes(schemaData.getData()).asBytes())
&& schemaAndMetadata.schema.getType() == schemaData.getType()) {
schemaVersion = schemaAndMetadata.version;
completableFuture.complete(schemaVersion);
return completableFuture;
Expand All @@ -339,14 +335,23 @@ public CompletableFuture<SchemaVersion> getSchemaVersionBySchemaData(

private CompletableFuture<Void> checkCompatibilityWithLatest(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy) {
if (SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE == strategy) {
return CompletableFuture.completedFuture(null);
}
return getSchema(schemaId).thenCompose(existingSchema -> {
if (existingSchema != null && !existingSchema.schema.isDeleted()) {
CompletableFuture<Void> result = new CompletableFuture<>();
try {
checkCompatible(existingSchema, schema, strategy);
result.complete(null);
} catch (IncompatibleSchemaException e) {
result.completeExceptionally(e);
if (existingSchema.schema.getType() != schema.getType()) {
result.completeExceptionally(new IncompatibleSchemaException(
String.format("Incompatible schema: exists schema type %s, new schema type %s",
existingSchema.schema.getType(), schema.getType())));
} else {
try {
checkCompatible(existingSchema, schema, strategy);
result.complete(null);
} catch (IncompatibleSchemaException e) {
result.completeExceptionally(e);
}
}
return result;
} else {
Expand All @@ -366,18 +371,35 @@ private CompletableFuture<Void> checkCompatibilityWithAll(SchemaData schema,
SchemaCompatibilityStrategy strategy,
List<SchemaAndMetadata> schemaAndMetadataList) {
CompletableFuture<Void> result = new CompletableFuture<>();
try {
compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
.checkCompatible(schemaAndMetadataList
.stream()
.map(schemaAndMetadata -> schemaAndMetadata.schema)
.collect(Collectors.toList()), schema, strategy);
if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
result.complete(null);
} catch (Exception e) {
if (e instanceof IncompatibleSchemaException) {
result.completeExceptionally(e);
} else {
SchemaAndMetadata breakSchema = null;
for (SchemaAndMetadata schemaAndMetadata : schemaAndMetadataList) {
if (schemaAndMetadata.schema.getType() != schema.getType()) {
breakSchema = schemaAndMetadata;
break;
}
}
if (breakSchema == null) {
try {
compatibilityChecks.getOrDefault(schema.getType(), SchemaCompatibilityCheck.DEFAULT)
.checkCompatible(schemaAndMetadataList
.stream()
.map(schemaAndMetadata -> schemaAndMetadata.schema)
.collect(Collectors.toList()), schema, strategy);
result.complete(null);
} catch (Exception e) {
if (e instanceof IncompatibleSchemaException) {
result.completeExceptionally(e);
} else {
result.completeExceptionally(new IncompatibleSchemaException(e));
}
}
} else {
result.completeExceptionally(new IncompatibleSchemaException(e));
result.completeExceptionally(new IncompatibleSchemaException(
String.format("Incompatible schema: exists schema type %s, new schema type %s",
breakSchema.schema.getType(), schema.getType())));
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.schema.Schemas;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;

Expand Down Expand Up @@ -172,101 +176,6 @@ public void structTypeConsumerConsumerUndefinedCompatible() throws Exception {
assertTrue(t.getMessage().endsWith("Incompatible schema: exists schema type JSON, new schema type AVRO"));
}

@Test
public void structTypeProducerProducerAlwaysCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
"structTypeProducerProducerAlwaysCompatible"
).toString();

pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
.topic(topicName)
.create();

pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topicName)
.create();
}

@Test
public void structTypeProducerConsumerAlwaysCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

final String subName = "my-sub";
String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
"structTypeProducerConsumerAlwaysCompatible"
).toString();

pulsarClient.newProducer(Schema.JSON(Schemas.PersonOne.class))
.topic(topicName)
.create();

pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscribe();
}

@Test
public void structTypeConsumerProducerAlwaysCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

final String subName = "my-sub";
String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
"structTypeConsumerProducerAlwaysCompatible"
).toString();

pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscribe();

pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topicName)
.create();
}

@Test
public void structTypeConsumerConsumerAlwaysCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

final String subName = "my-sub";
String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
"structTypeConsumerConsumerAlwaysCompatible"
).toString();

pulsarClient.newConsumer(Schema.JSON(Schemas.PersonOne.class))
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "1")
.subscribe();

pulsarClient.newConsumer(Schema.AVRO(Schemas.PersonOne.class))
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "2")
.subscribe();
}

@Test
public void primitiveTypeProducerProducerUndefinedCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.UNDEFINED);
Expand Down Expand Up @@ -371,98 +280,50 @@ public void primitiveTypeConsumerConsumerUndefinedCompatible() throws Exception
}

@Test
public void primitiveTypeProducerProducerAlwaysCompatible() throws Exception {
public void testAlwaysCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

String topicName = TopicName.get(
final String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
"primitiveTypeProducerProducerAlwaysCompatible"
"testAlwaysCompatible" + UUID.randomUUID().toString()
).toString();

pulsarClient.newProducer(Schema.INT32)
.topic(topicName)
.create();

pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();
}

@Test
public void primitiveTypeProducerConsumerAlwaysCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

final String subName = "my-sub";
String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
"primitiveTypeProducerConsumerAlwaysCompatible"
).toString();

pulsarClient.newProducer(Schema.INT32)
.topic(topicName)
.create();

pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "2")
.subscribe();
}

@Test
public void primitiveTypeConsumerProducerAlwaysCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

final String subName = "my-sub";
String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
"primitiveTypeConsumerProducerAlwaysCompatible"
).toString();

pulsarClient.newConsumer(Schema.INT32)
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscribe();

pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.create();
}

@Test
public void primitiveTypeConsumerConsumerAlwaysCompatible() throws Exception {
admin.namespaces().setSchemaCompatibilityStrategy(namespaceName, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

final String subName = "my-sub";
String topicName = TopicName.get(
TopicDomain.persistent.value(),
PUBLIC_TENANT,
namespace,
"primitiveTypeConsumerConsumerAlwaysCompatible"
).toString();

pulsarClient.newConsumer(Schema.INT32)
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "1")
.subscribe();

pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "2")
.subscribe();
Schema<?>[] schemas = new Schema[] {
Schema.AVRO(Schemas.PersonOne.class),
Schema.AVRO(Schemas.PersonFour.class),
Schema.JSON(Schemas.PersonOne.class),
Schema.JSON(Schemas.PersonFour.class),
Schema.INT8,
Schema.INT16,
Schema.INT32,
Schema.INT64,
Schema.DATE,
Schema.BOOL,
Schema.DOUBLE,
Schema.STRING,
Schema.BYTES,
Schema.FLOAT,
Schema.INSTANT,
Schema.BYTEBUFFER,
Schema.TIME,
Schema.TIMESTAMP,
Schema.LOCAL_DATE,
Schema.LOCAL_DATE_TIME,
Schema.LOCAL_TIME
};

for (Schema<?> schema : schemas) {
pulsarClient.newProducer(schema)
.topic(topicName)
.create();
}

for (Schema<?> schema : schemas) {
pulsarClient.newConsumer(schema)
.topic(topicName)
.subscriptionName(UUID.randomUUID().toString())
.subscribe();
}
}

}

0 comments on commit 04f8c96

Please sign in to comment.