Skip to content

Commit

Permalink
[schema] clean up the implementations of schema bc check (apache#3665)
Browse files Browse the repository at this point in the history
*Motivation*

There are a lot of duplications in schema bc check implementations.

*Modifications*

Removed duplicated code.
  • Loading branch information
sijie authored Feb 28, 2019
1 parent 77d4c28 commit 1dd7207
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.schema;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.util.Arrays;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.pulsar.common.schema.SchemaData;

/**
* The abstract implementation of {@link SchemaCompatibilityCheck} using Avro Schema.
*/
abstract class AvroSchemaBasedCompatibilityCheck implements SchemaCompatibilityCheck {

@Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
Schema.Parser fromParser = new Schema.Parser();
Schema fromSchema = fromParser.parse(new String(from.getData(), UTF_8));
Schema.Parser toParser = new Schema.Parser();
Schema toSchema = toParser.parse(new String(to.getData(), UTF_8));

SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
try {
schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
} catch (SchemaValidationException e) {
return false;
}
return true;
}

static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy,
boolean onlyLatestValidator) {
final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder();
switch (compatibilityStrategy) {
case BACKWARD:
return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
case FORWARD:
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
case FULL:
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
default:
return NeverSchemaValidator.INSTANCE;
}
}

static SchemaValidator createLatestOrAllValidator(SchemaValidatorBuilder validatorBuilder, boolean onlyLatest) {
return onlyLatest ? validatorBuilder.validateLatest() : validatorBuilder.validateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,59 +18,16 @@
*/
package org.apache.pulsar.broker.service.schema;

import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;


import java.util.Arrays;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AvroSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
private final static Logger log = LoggerFactory.getLogger(AvroSchemaCompatibilityCheck.class);
/**
* {@link SchemaCompatibilityCheck} for {@link SchemaType#AVRO}.
*/
public class AvroSchemaCompatibilityCheck extends AvroSchemaBasedCompatibilityCheck {

@Override
public SchemaType getSchemaType() {
return SchemaType.AVRO;
}

@Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
Schema.Parser fromParser = new Schema.Parser();
Schema fromSchema = fromParser.parse(new String(from.getData()));
Schema.Parser toParser = new Schema.Parser();
Schema toSchema = toParser.parse(new String(to.getData()));

SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
try {
schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
} catch (SchemaValidationException e) {
return false;
}
return true;
}

private static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy,
boolean onlyLatestValidator) {
final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder();
switch (compatibilityStrategy) {
case BACKWARD:
return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
case FORWARD:
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
case FULL:
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
default:
return NeverSchemaValidator.INSTANCE;
}
}

private static SchemaValidator createLatestOrAllValidator(SchemaValidatorBuilder validatorBuilder, boolean onlyLatest) {
return onlyLatest ? validatorBuilder.validateLatest() : validatorBuilder.validateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,21 @@
*/
package org.apache.pulsar.broker.service.schema;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;

import java.io.IOException;
import java.util.Arrays;

/**
* {@link SchemaCompatibilityCheck} for {@link SchemaType#JSON}.
*/
@SuppressWarnings("unused")
public class JsonSchemaCompatibilityCheck implements SchemaCompatibilityCheck {
public class JsonSchemaCompatibilityCheck extends AvroSchemaBasedCompatibilityCheck {

@Override
public SchemaType getSchemaType() {
Expand All @@ -44,7 +44,7 @@ public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityS
if (isAvroSchema(from)) {
if (isAvroSchema(to)) {
// if both producer and broker have the schema in avro format
return isCompatibleAvroSchema(from, to, strategy);
return super.isCompatible(from, to, strategy);
} else if (isJsonSchema(to)) {
// if broker have the schema in avro format but producer sent a schema in the old json format
// allow old schema format for backwards compatiblity
Expand Down Expand Up @@ -74,21 +74,6 @@ public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityS
}
}

private boolean isCompatibleAvroSchema(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
Schema.Parser fromParser = new Schema.Parser();
Schema fromSchema = fromParser.parse(new String(from.getData()));
Schema.Parser toParser = new Schema.Parser();
Schema toSchema = toParser.parse(new String(to.getData()));

SchemaValidator schemaValidator = createSchemaValidator(strategy, true);
try {
schemaValidator.validate(toSchema, Arrays.asList(fromSchema));
} catch (SchemaValidationException e) {
return false;
}
return true;
}

private ObjectMapper objectMapper;
private ObjectMapper getObjectMapper() {
if (objectMapper == null) {
Expand All @@ -112,7 +97,7 @@ private boolean isAvroSchema(SchemaData schemaData) {
try {

Schema.Parser fromParser = new Schema.Parser();
Schema fromSchema = fromParser.parse(new String(schemaData.getData()));
Schema fromSchema = fromParser.parse(new String(schemaData.getData(), UTF_8));
return true;
} catch (SchemaParseException e) {
return false;
Expand All @@ -129,22 +114,4 @@ private boolean isJsonSchema(SchemaData schemaData) {
}
}

private static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy,
boolean onlyLatestValidator) {
final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder();
switch (compatibilityStrategy) {
case BACKWARD:
return createLatestOrAllValidator(validatorBuilder.canReadStrategy(), onlyLatestValidator);
case FORWARD:
return createLatestOrAllValidator(validatorBuilder.canBeReadStrategy(), onlyLatestValidator);
case FULL:
return createLatestOrAllValidator(validatorBuilder.mutualReadStrategy(), onlyLatestValidator);
default:
return NeverSchemaValidator.INSTANCE;
}
}

private static SchemaValidator createLatestOrAllValidator(SchemaValidatorBuilder validatorBuilder, boolean onlyLatest) {
return onlyLatest ? validatorBuilder.validateLatest() : validatorBuilder.validateAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import org.apache.pulsar.common.schema.SchemaType;

/**
* The {@link SchemaCompatibilityCheck} implementation for {@link SchemaType#PROTOBUF}.
*/
public class ProtobufSchemaCompatibilityCheck extends AvroSchemaCompatibilityCheck {

@Override
Expand Down

0 comments on commit 1dd7207

Please sign in to comment.