Skip to content

Commit

Permalink
Add compatibility check for primitive schema types (apache#5051)
Browse files Browse the repository at this point in the history
### Motivation
To fix apache#4831
  • Loading branch information
congbobo184 authored and sijie committed Aug 27, 2019
1 parent d22e014 commit 147c3c8
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,13 @@ private boolean isCompatible(SchemaAndMetadata existingSchema, SchemaData newSch
SchemaCompatibilityStrategy strategy) {
HashCode existingHash = hashFunction.hashBytes(existingSchema.schema.getData());
HashCode newHash = hashFunction.hashBytes(newSchema.getData());
SchemaData existingSchemaData = existingSchema.schema;
if (existingSchemaData.getType().isPrimitive()) {
return newSchema.getType() == existingSchemaData.getType();
}
return newHash.equals(existingHash) ||
compatibilityChecks.getOrDefault(newSchema.getType(), SchemaCompatibilityCheck.DEFAULT)
.isCompatible(existingSchema.schema, newSchema, strategy);
.isCompatible(existingSchemaData, newSchema, strategy);
}

public CompletableFuture<Long> findSchemaVersion(String schemaId, SchemaData schemaData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -42,6 +43,8 @@
import org.testng.annotations.Test;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;

/**
* Test Pulsar Schema.
Expand Down Expand Up @@ -260,5 +263,63 @@ public void testAutoConsumeSchemaSubscribeFirst() throws Exception {
producer.close();
}

@Test
public void testPrimitiveSchemaTypeCompatibilityCheck() {
List<Schema> schemas = new ArrayList<>();

schemas.add(Schema.STRING);
schemas.add(Schema.BYTES);
schemas.add(Schema.INT8);
schemas.add(Schema.INT16);
schemas.add(Schema.INT32);
schemas.add(Schema.INT64);
schemas.add(Schema.BOOL);
schemas.add(Schema.DOUBLE);
schemas.add(Schema.FLOAT);
schemas.add(Schema.DATE);
schemas.add(Schema.TIME);
schemas.add(Schema.TIMESTAMP);
schemas.add(null);


schemas.stream().forEach(schemaProducer -> {
schemas.stream().forEach(schemaConsumer -> {
try {
String topicName = schemaProducer.getSchemaInfo().getName() + schemaConsumer.getSchemaInfo().getName();
if (schemaProducer == null) {
client.newProducer()
.topic(topicName)
.create().close();
} else {
client.newProducer(schemaProducer)
.topic(topicName)
.create().close();
}

if (schemaConsumer == null) {
client.newConsumer()
.topic(topicName)
.subscriptionName("test")
.subscribe().close();
} else {
client.newConsumer(schemaConsumer)
.topic(topicName)
.subscriptionName("test")
.subscribe().close();
}

assertEquals(schemaProducer.getSchemaInfo().getType(),
schemaConsumer.getSchemaInfo().getType());

} catch (PulsarClientException e) {
assertNotEquals(schemaProducer.getSchemaInfo().getType(),
schemaConsumer.getSchemaInfo().getType());
}

});
});

}

}

0 comments on commit 147c3c8

Please sign in to comment.