Skip to content

Commit

Permalink
Improve response of Rest API for create schema. (apache#5103)
Browse files Browse the repository at this point in the history
Fixes apache#5094
### Motivation

Currently, can't get some detail messages about incompatible while use REST API to create a new schema. This PR let users to get the detail messages.

### Future more
Use pulsar-admin or pulsar-client also can get the detail messages about incompatible. This PR only handle the REST way.
  • Loading branch information
codelipenghui authored and sijie committed Sep 5, 2019
1 parent 53fb055 commit b196283
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ public void postSchema(
)
).exceptionally(error -> {
if (error.getCause() instanceof IncompatibleSchemaException) {
response.resume(Response.status(Response.Status.CONFLICT).build());
response.resume(Response.status(Response.Status.CONFLICT.getStatusCode(),
error.getCause().getMessage()).build());
} else if (error instanceof InvalidSchemaDataException) {
response.resume(Response.status(
422, /* Unprocessable Entity */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.avro.SchemaValidationException;
import org.apache.avro.SchemaValidator;
import org.apache.avro.SchemaValidatorBuilder;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -38,12 +39,12 @@
abstract class AvroSchemaBasedCompatibilityCheck implements SchemaCompatibilityCheck {

@Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
return isCompatible(Collections.singletonList(from), to, strategy);
public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
checkCompatible(Collections.singletonList(from), to, strategy);
}

@Override
public boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) {
public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
LinkedList<Schema> fromList = new LinkedList<>();
try {
for (SchemaData schemaData : from) {
Expand All @@ -56,15 +57,14 @@ public boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaComp
schemaValidator.validate(toSchema, fromList);
} catch (SchemaParseException e) {
log.error("Error during schema parsing: {}", e.getMessage(), e);
return false;
throw new IncompatibleSchemaException(e);
} catch (SchemaValidationException e) {
log.error("Error during schema compatibility check: {}", e.getMessage(), e);
return false;
throw new IncompatibleSchemaException(e);
}
return true;
}

static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) {
static SchemaValidator createSchemaValidator(SchemaCompatibilityStrategy compatibilityStrategy) throws IncompatibleSchemaException {
final SchemaValidatorBuilder validatorBuilder = new SchemaValidatorBuilder();
switch (compatibilityStrategy) {
case BACKWARD_TRANSITIVE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schem
return completedFuture(false);
}

@Override
public CompletableFuture<Void> checkCompatible(String schemaId, SchemaData schema, SchemaCompatibilityStrategy strategy) {
return completedFuture(null);
}

@Override
public SchemaVersion versionFromBytes(byte[] version) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;

Expand All @@ -40,37 +41,34 @@ public SchemaType getSchemaType() {
}

@Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
if (isAvroSchema(from)) {
if (isAvroSchema(to)) {
// if both producer and broker have the schema in avro format
return super.isCompatible(from, to, strategy);
super.checkCompatible(from, to, strategy);
} else if (isJsonSchema(to)) {
// if broker have the schema in avro format but producer sent a schema in the old json format
// allow old schema format for backwards compatiblity
return true;
} else {
// unknown schema format
return false;
throw new IncompatibleSchemaException("Unknown schema format");
}
} else if (isJsonSchema(from)){

if (isAvroSchema(to)) {
// if broker have the schema in old json format but producer sent a schema in the avro format
// return true and overwrite the old format
return true;
} else if (isJsonSchema(to)) {
// if both producer and broker have the schema in old json format
return isCompatibleJsonSchema(from, to);
isCompatibleJsonSchema(from, to);
} else {
// unknown schema format
return false;
throw new IncompatibleSchemaException("Unknown schema format");
}
} else {
// broker has schema format with unknown format
// maybe corrupted?
// return true to overwrite
return true;
}
}

Expand All @@ -82,14 +80,17 @@ private ObjectMapper getObjectMapper() {
return objectMapper;
}

private boolean isCompatibleJsonSchema(SchemaData from, SchemaData to) {
private void isCompatibleJsonSchema(SchemaData from, SchemaData to) throws IncompatibleSchemaException {
try {
ObjectMapper objectMapper = getObjectMapper();
JsonSchema fromSchema = objectMapper.readValue(from.getData(), JsonSchema.class);
JsonSchema toSchema = objectMapper.readValue(to.getData(), JsonSchema.class);
return fromSchema.getId().equals(toSchema.getId());
if (!fromSchema.getId().equals(toSchema.getId())) {
throw new IncompatibleSchemaException(String.format("Incompatible Schema from %s + to %s",
new String(from.getData(), UTF_8), new String(to.getData(), UTF_8)));
}
} catch (IOException e) {
return false;
throw new IncompatibleSchemaException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.schema;

import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.protocol.schema.SchemaData;
Expand Down Expand Up @@ -54,17 +55,17 @@ public SchemaType getSchemaType() {
}

@Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
return isCompatible(Collections.singletonList(from), to, strategy);
public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
checkCompatible(Collections.singletonList(from), to, strategy);
}

@Override
public boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) {
public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
if (strategy == SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE) {
return true;
return;
}
if (to.getType() != SchemaType.KEY_VALUE) {
return false;
throw new IncompatibleSchemaException("To schema is not a KEY_VALUE schema.");
}
LinkedList<SchemaData> fromKeyList = new LinkedList<>();
LinkedList<SchemaData> fromValueList = new LinkedList<>();
Expand All @@ -75,18 +76,23 @@ public boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaComp

for (SchemaData schemaData : from) {
if (schemaData.getType() != SchemaType.KEY_VALUE) {
return false;
throw new IncompatibleSchemaException("From schema is not a KEY_VALUE schema.");
}
fromKeyValue = decodeKeyValueSchemaData(schemaData);
if (fromKeyValue.getKey().getType() != toKeyType || fromKeyValue.getValue().getType() != toValueType) {
return false;
throw new IncompatibleSchemaException(String.format("Key schemas or Value schemas are different schema type, " +
"from key schema type is %s and to key schema is %s, from value schema is %s and to value schema is %s",
fromKeyValue.getKey().getType(),
toKeyType,
fromKeyValue.getValue().getType(),
toValueType));
}
fromKeyList.addFirst(fromKeyValue.getKey());
fromValueList.addFirst(fromKeyValue.getValue());
}
SchemaCompatibilityCheck keyCheck = checkers.getOrDefault(toKeyType, SchemaCompatibilityCheck.DEFAULT);
SchemaCompatibilityCheck valueCheck = checkers.getOrDefault(toValueType, SchemaCompatibilityCheck.DEFAULT);
return keyCheck.isCompatible(fromKeyList, toKeyValue.getKey(), strategy)
&& valueCheck.isCompatible(fromValueList, toKeyValue.getValue(), strategy);
keyCheck.checkCompatible(fromKeyList, toKeyValue.getKey(), strategy);
valueCheck.checkCompatible(fromValueList, toKeyValue.getValue(), strategy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
*/
package org.apache.pulsar.broker.service.schema;

import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;

import java.util.Collections;

public interface SchemaCompatibilityCheck {

SchemaType getSchemaType();
Expand All @@ -30,9 +33,8 @@ public interface SchemaCompatibilityCheck {
* @param from the current schema i.e. schema that the broker has
* @param to the future schema i.e. the schema sent by the producer
* @param strategy the strategy to use when comparing schemas
* @return whether the schemas are compatible
*/
boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy);
void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException;

/**
*
Expand All @@ -41,7 +43,25 @@ public interface SchemaCompatibilityCheck {
* @param strategy the strategy to use when comparing schemas
* @return whether the schemas are compatible
*/
boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy);
void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException;

default boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
try {
checkCompatible(from, to, strategy);
return true;
} catch (Exception e) {
return false;
}
}

default boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) {
try {
checkCompatible(from, to, strategy);
return true;
} catch (IncompatibleSchemaException e) {
return false;
}
}

SchemaCompatibilityCheck DEFAULT = new SchemaCompatibilityCheck() {

Expand All @@ -51,20 +71,16 @@ public SchemaType getSchemaType() {
}

@Override
public boolean isCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) {
public void checkCompatible(SchemaData from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) {
return false;
} else {
return true;
throw new IncompatibleSchemaException("Schema compatibility strategy is ALWAYS_INCOMPATIBLE");
}
}

@Override
public boolean isCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) {
public void checkCompatible(Iterable<SchemaData> from, SchemaData to, SchemaCompatibilityStrategy strategy) throws IncompatibleSchemaException {
if (strategy == SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE) {
return false;
} else {
return true;
throw new IncompatibleSchemaException("Schema compatibility strategy is ALWAYS_INCOMPATIBLE");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ CompletableFuture<SchemaVersion> putSchemaIfAbsent(String schemaId, SchemaData s
CompletableFuture<SchemaVersion> deleteSchema(String schemaId, String user);

CompletableFuture<Boolean> isCompatible(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy);

CompletableFuture<Void> checkCompatible(String schemaId, SchemaData schema,
SchemaCompatibilityStrategy strategy);

CompletableFuture<List<SchemaAndMetadata>> trimDeletedSchemaAndGetList(String schemaId);
Expand Down
Loading

0 comments on commit b196283

Please sign in to comment.