Skip to content

Commit

Permalink
adding avro schema (apache#1917)
Browse files Browse the repository at this point in the history
* adding avro schema

* improving implementation

* finishing implementation

* remove unnecessary newlines

* fixing poms

* adding avro schema check

* add missing license header

* Add types to proto definitions

* adding compatibiliy unit tests

* shade avro dependencies

* add shading to pulsar client kafka
  • Loading branch information
jerrypeng authored and merlimat committed Jun 11, 2018
1 parent 8ab6c34 commit 2dae33d
Show file tree
Hide file tree
Showing 20 changed files with 735 additions and 2 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ flexible messaging model and an intuitive client API.</description>
<kafka-client.version>0.10.2.1</kafka-client.version>
<rabbitmq-client.version>5.1.1</rabbitmq-client.version>
<aws-sdk.version>1.11.297</aws-sdk.version>
<avro.version>1.8.2</avro.version>

<!-- test dependencies -->
<disruptor.version>3.4.0</disruptor.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,8 @@ public class ServiceConfiguration implements PulsarConfiguration {

private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
"org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck"
"org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck",
"org.apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheck"
);

/**** --- WebSocket --- ****/
Expand Down
29 changes: 29 additions & 0 deletions pulsar-broker-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@
<include>org.apache.httpcomponents:httpclient</include>
<include>commons-logging:commons-logging</include>
<include>org.apache.httpcomponents:httpcore</include>
<include>org.apache.avro:avro</include>
<!-- Avro transitive dependencies-->
<include>org.codehaus.jackson:jackson-core-asl</include>
<include>org.codehaus.jackson:jackson-mapper-asl</include>
<include>com.thoughtworks.paranamer:paranamer</include>
<include>org.xerial.snappy:snappy-java</include>
<include>org.apache.commons:commons-compress</include>
<include>org.tukaani:xz</include>
</includes>
</artifactSet>
<filters>
Expand Down Expand Up @@ -311,6 +319,27 @@
<pattern>org.apache.http</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.apache.pulsar.shade.org.apache.avro</shadedPattern>
</relocation>
<!-- Avro transitive dependencies-->
<relocation>
<pattern>org.codehaus.jackson</pattern>
<shadedPattern>org.apache.pulsar.shade.org.codehaus.jackson</shadedPattern>
</relocation>
<relocation>
<pattern>com.thoughtworks.paranamer</pattern>
<shadedPattern>org.apache.pulsar.shade.com.thoughtworks.paranamer</shadedPattern>
</relocation>
<relocation>
<pattern>org.xerial.snappy</pattern>
<shadedPattern>org.apache.pulsar.shade.org.xerial.snappy</shadedPattern>
</relocation>
<relocation>
<pattern>org.tukaani</pattern>
<shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
Expand Down
6 changes: 6 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,12 @@
<artifactId>java-semver</artifactId>
</dependency>

<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>

<!-- aspectJ dependencies -->
<dependency>
<groupId>org.aspectj</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ private static SchemaType getType(PulsarApi.Schema.Type protocolType) {
return SchemaType.STRING;
case Json:
return SchemaType.JSON;
case Protobuf:
return SchemaType.PROTOBUF;
case Avro:
return SchemaType.AVRO;
default:
return SchemaType.NONE;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;


import java.util.Arrays;

public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck {

private final CompatibilityStrategy compatibilityStrategy;

public AvroSchemaCompatibilityCheck () {
this(CompatibilityStrategy.FULL);
}

public AvroSchemaCompatibilityCheck(CompatibilityStrategy compatibilityStrategy) {
this.compatibilityStrategy = compatibilityStrategy;
}

@Override
public SchemaType getSchemaType() {
return SchemaType.AVRO;
}

@Override
public boolean isCompatible(SchemaData from, SchemaData to) {

Schema.Parser fromParser = new Schema.Parser();
Schema fromSchema = fromParser.parse(new String(from.getData()));
Schema.Parser toParser = new Schema.Parser();
Schema toSchema = toParser.parse(new String(to.getData()));

SchemaValidator schemaValidator = createSchemaValidator(this.compatibilityStrategy, true);
try {
schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
} catch (SchemaValidationException e) {
return false;
}
return true;
}

public enum CompatibilityStrategy {
BACKWARD,
FORWARD,
FULL
}

private static SchemaValidator createSchemaValidator(CompatibilityStrategy compatibilityStrategy,
boolean onlyLatestValidator) {
final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder();
switch (compatibilityStrategy) {
case BACKWARD:
return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
case FORWARD:
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
default:
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
}
}

private static SchemaValidator createLatestOrAllValidator(SchemaValidatorBuilder validatorBuilder, boolean onlyLatest) {
return onlyLatest ? validatorBuilder.validateLatest() : validatorBuilder.validateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ private SchemaRegistryFormat.SchemaInfo deleted(String schemaId, String user) {
}

private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId, SchemaData schema) {

return getSchema(schemaId).thenApply(storedSchema ->
(storedSchema == null) ||
compatibilityChecks.getOrDefault(
Expand All @@ -154,6 +155,10 @@ static SchemaType convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType
return SchemaType.STRING;
case JSON:
return SchemaType.JSON;
case PROTOBUF:
return SchemaType.PROTOBUF;
case AVRO:
return SchemaType.AVRO;
default:
return SchemaType.NONE;
}
Expand All @@ -167,6 +172,10 @@ static SchemaRegistryFormat.SchemaInfo.SchemaType convertFromDomainType(SchemaTy
return SchemaRegistryFormat.SchemaInfo.SchemaType.STRING;
case JSON:
return SchemaRegistryFormat.SchemaInfo.SchemaType.JSON;
case PROTOBUF:
return SchemaRegistryFormat.SchemaInfo.SchemaType.PROTOBUF;
case AVRO:
return SchemaRegistryFormat.SchemaInfo.SchemaType.AVRO;
default:
return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
}
Expand Down
2 changes: 2 additions & 0 deletions pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ message SchemaInfo {
NONE = 1;
STRING = 2;
JSON = 3;
PROTOBUF = 4;
AVRO = 5;
}
message KeyValuePair {
required string key = 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.Assert;
import org.testng.annotations.Test;

public class AvroSchemaCompatibilityCheckTest {

private static final String schemaJson1 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}]}";
private static final SchemaData schemaData1 = getSchemaData(schemaJson1);

private static final String schemaJson2 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"}]}";
private static final SchemaData schemaData2 = getSchemaData(schemaJson2);

private static final String schemaJson3 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org" +
".apache.pulsar.broker.service.schema.AvroSchemaCompatibilityCheckTest$\"," +
"\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"string\"}]}";
private static final SchemaData schemaData3 = getSchemaData(schemaJson3);

private static final String schemaJson4 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1_v2\",\"type\":\"string\"," +
"\"aliases\":[\"field1\"]}]}";
private static final SchemaData schemaData4 = getSchemaData(schemaJson4);

private static final String schemaJson5 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\"]}]}";
private static final SchemaData schemaData5 = getSchemaData(schemaJson5);

private static final String schemaJson6 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":[\"null\"," +
"\"string\",\"int\"]}]}";
private static final SchemaData schemaData6 = getSchemaData(schemaJson6);

private static final String schemaJson7 =
"{\"type\":\"record\",\"name\":\"DefaultTest\",\"namespace\":\"org.apache.pulsar.broker.service.schema" +
".AvroSchemaCompatibilityCheckTest$\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"}," +
"{\"name\":\"field2\",\"type\":\"string\",\"default\":\"foo\"},{\"name\":\"field3\"," +
"\"type\":\"string\",\"default\":\"bar\"}]}";
private static final SchemaData schemaData7 = getSchemaData(schemaJson7);

/**
* make sure new schema is backwards compatible with latest
*/
@Test
public void testBackwardCompatibility() {

AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck(
AvroSchemaCompatibilityCheck.CompatibilityStrategy.BACKWARD
);

// adding a field with default is backwards compatible
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
"adding a field with default is backwards compatible");
// adding a field without default is NOT backwards compatible
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
"adding a field without default is NOT backwards compatible");
// Modifying a field name is not backwards compatible
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData4),
"Modifying a field name is not backwards compatible");
// evolving field to a union is backwards compatible
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData5),
"evolving field to a union is backwards compatible");
// removing a field from a union is NOT backwards compatible
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData5, schemaData1),
"removing a field from a union is NOT backwards compatible");
// adding a field to a union is backwards compatible
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData5, schemaData6),
"adding a field to a union is backwards compatible");
// removing a field a union is NOT backwards compatible
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData6, schemaData5),
"removing a field a union is NOT backwards compatible");
}

/**
* Check to make sure the last schema version is forward-compatible with new schemas
*/
@Test
public void testForwardCompatibility() {

AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck(
AvroSchemaCompatibilityCheck.CompatibilityStrategy.FORWARD
);

Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData3),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData2),
"adding a field is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData7),
"removing fields is forward compatible");
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData2, schemaData1),
"removing fields with defaults forward compatible");
}

/**
* Make sure the new schema is forward- and backward-compatible from the latest to newest and from the newest to latest.
*/
@Test
public void testFullCompatibility() {
AvroSchemaCompatibilityCheck avroSchemaCompatibilityCheck = new AvroSchemaCompatibilityCheck(
AvroSchemaCompatibilityCheck.CompatibilityStrategy.FULL
);
Assert.assertTrue(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData2),
"adding a field with default fully compatible");
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData1, schemaData3),
"adding a field without default is not fully compatible");
Assert.assertFalse(avroSchemaCompatibilityCheck.isCompatible(schemaData3, schemaData1),
"adding a field without default is not fully compatible");

}

private static SchemaData getSchemaData(String schemaJson) {
return SchemaData.builder().data(schemaJson.getBytes()).type(SchemaType.AVRO).build();
}
}
Loading

0 comments on commit 2dae33d

Please sign in to comment.