Skip to content

Commit

Permalink
[schema] store schema type correctly in schema registry (apache#3940)
Browse files Browse the repository at this point in the history
*Motivation*

Fixes apache#3925

We have 3 places of defining schema type enums. We kept adding
new schema type in pulsar-common. However we don't update the schema type
in wire protocol and schema storage.

This causes `SchemaType.NONE` is stored in SchemaRegistry.
It fails debeizum connector on restarting.

*Modifications*

Make sure all 3 places have consistent schema type definitions.
Record the correct schema type.
  • Loading branch information
sijie authored and merlimat committed Mar 29, 2019
1 parent 08c97f7 commit 7e7175d
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -772,30 +772,13 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
});
}

private static SchemaType getType(PulsarApi.Schema.Type protocolType) {
switch (protocolType) {
case None:
return SchemaType.NONE;
case String:
return SchemaType.STRING;
case Json:
return SchemaType.JSON;
case Protobuf:
return SchemaType.PROTOBUF;
case Avro:
return SchemaType.AVRO;
default:
return SchemaType.NONE;
}
}

private SchemaData getSchema(PulsarApi.Schema protocolSchema) {
return SchemaData.builder()
.data(protocolSchema.getSchemaData().toByteArray())
.isDeleted(false)
.timestamp(System.currentTimeMillis())
.user(Strings.nullToEmpty(originalPrincipal))
.type(getType(protocolSchema.getType()))
.type(Commands.getSchemaType(protocolSchema.getType()))
.props(protocolSchema.getPropertiesList().stream().collect(
Collectors.toMap(
PulsarApi.KeyValue::getKey,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,36 +164,19 @@ private CompletableFuture<Boolean> checkCompatibilityWithLatest(String schemaId,

interface Functions {
static SchemaType convertToDomainType(SchemaRegistryFormat.SchemaInfo.SchemaType type) {
switch (type) {
case NONE:
return SchemaType.NONE;
case STRING:
return SchemaType.STRING;
case JSON:
return SchemaType.JSON;
case PROTOBUF:
return SchemaType.PROTOBUF;
case AVRO:
return SchemaType.AVRO;
default:
if (type.getNumber() < 0) {
return SchemaType.NONE;
} else {
// the value of type in `SchemaType` is always 1 less than the value of type `SchemaInfo.SchemaType`
return SchemaType.valueOf(type.getNumber() - 1);
}
}

static SchemaRegistryFormat.SchemaInfo.SchemaType convertFromDomainType(SchemaType type) {
switch (type) {
case NONE:
return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
case STRING:
return SchemaRegistryFormat.SchemaInfo.SchemaType.STRING;
case JSON:
return SchemaRegistryFormat.SchemaInfo.SchemaType.JSON;
case PROTOBUF:
return SchemaRegistryFormat.SchemaInfo.SchemaType.PROTOBUF;
case AVRO:
return SchemaRegistryFormat.SchemaInfo.SchemaType.AVRO;
default:
return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
if (type.getValue() < 0) {
return SchemaRegistryFormat.SchemaInfo.SchemaType.NONE;
} else {
return SchemaRegistryFormat.SchemaInfo.SchemaType.valueOf(type.getValue() + 1);
}
}

Expand Down
11 changes: 11 additions & 0 deletions pulsar-broker/src/main/proto/SchemaRegistryFormat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ message SchemaInfo {
JSON = 3;
PROTOBUF = 4;
AVRO = 5;
BOOLEAN = 6;
INT8 = 7;
INT16 = 8;
INT32 = 9;
INT64 = 10;
FLOAT = 11;
DOUBLE = 12;
DATE = 13;
TIME = 14;
TIMESTAMP = 15;
KEYVALUE = 16;
}
message KeyValuePair {
required string key = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,110 +20,159 @@

/**
* Types of supported schema for Pulsar messages
*
*
* <p>Ideally we should have just one single set of enum definitions
* for schema type. but we have 3 locations of defining schema types.
*
* <p>when you are adding a new schema type that whose
* schema info is required to be recorded in schema registry,
* add corresponding schema type into `pulsar-common/src/main/proto/PulsarApi.proto`
* and `pulsar-broker/src/main/proto/SchemaRegistryFormat.proto`.
*/
public enum SchemaType {
/**
* No schema defined
*/
NONE,
NONE(0),

/**
* boolean schema defined
* @since 2.3.0
* Simple String encoding with UTF-8
*/
BOOLEAN,
STRING(1),

/**
* Simple String encoding with UTF-8
* JSON object encoding and validation
*/
JSON(2),

/**
* Protobuf message encoding and decoding
*/
PROTOBUF(3),

/**
* Serialize and deserialize via avro
*/
AVRO(4),

/**
* boolean schema defined
* @since 2.3.0
*/
STRING,
BOOLEAN(5),

/**
* A 8-byte integer.
*/
INT8,
INT8(6),

/**
* A 16-byte integer.
*/
INT16,
INT16(7),

/**
* A 32-byte integer.
*/
INT32,
INT32(8),

/**
* A 64-byte integer.
*/
INT64,
INT64(9),

/**
* A float number.
*/
FLOAT,
FLOAT(10),

/**
* A double number
*/
DOUBLE,

/**
* A bytes array.
*/
BYTES,
DOUBLE(11),

/**
* Date
* @since 2.4.0
*/
DATE,
DATE(12),

/**
* Time
* @since 2.4.0
*/
TIME,
TIME(13),

/**
* Timestamp
* @since 2.4.0
*/
TIMESTAMP,
TIMESTAMP(14),

/**
* JSON object encoding and validation
* A Schema that contains Key Schema and Value Schema.
*/
JSON,
KEY_VALUE(15),

/**
* Protobuf message encoding and decoding
*/
PROTOBUF,
//
// Schemas that don't have schema info. the value should be negative.
//

/**
* Serialize and deserialize via avro
* A bytes array.
*/
AVRO,
BYTES(-1),

/**
* Auto Detect Schema Type.
*/
@Deprecated
AUTO,
AUTO(-2),

/**
* Auto Consume Type.
*/
AUTO_CONSUME,
AUTO_CONSUME(-3),

/**
* Auto Publish Type.
*/
AUTO_PUBLISH,

/**
* A Schema that contains Key Schema and Value Schema.
*/
KEY_VALUE
AUTO_PUBLISH(-4);

int value;

SchemaType(int value) {
this.value = value;
}

public int getValue() {
return this.value;
}

public static SchemaType valueOf(int value) {
switch (value) {
case 0: return NONE;
case 1: return STRING;
case 2: return JSON;
case 3: return PROTOBUF;
case 4: return AVRO;
case 5: return BOOLEAN;
case 6: return INT8;
case 7: return INT16;
case 8: return INT32;
case 9: return INT64;
case 10: return FLOAT;
case 11: return DOUBLE;
case 12: return DATE;
case 13: return TIME;
case 14: return TIMESTAMP;
case 15: return KEY_VALUE;
case -1: return BYTES;
case -2: return AUTO;
case -3: return AUTO_CONSUME;
case -4: return AUTO_PUBLISH;
default: return NONE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ public void connectionOpened(final ClientCnx cnx) {
}

SchemaInfo si = schema.getSchemaInfo();
if (si != null && SchemaType.BYTES == si.getType()) {
if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
// don't set schema for Schema.BYTES
si = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@ public void connectionOpened(final ClientCnx cnx) {
} else {
schemaInfo = schema.getSchemaInfo();
}
} else if (schema.getSchemaInfo().getType() == SchemaType.BYTES) {
} else if (schema.getSchemaInfo().getType() == SchemaType.BYTES
|| schema.getSchemaInfo().getType() == SchemaType.NONE) {
// don't set schema info for Schema.BYTES
schemaInfo = null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ public static Schema<?> getSchema(SchemaInfo schemaInfo) {
return DoubleSchema.of();
case BYTES:
return BytesSchema.of();
case DATE:
return DateSchema.of();
case TIME:
return TimeSchema.of();
case TIMESTAMP:
return TimestampSchema.of();
case KEY_VALUE:
return KeyValueSchema.kvBytes();
case JSON:
case AVRO:
return GenericSchemaImpl.of(schemaInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.api.proto.PulsarApi.Schema;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -548,40 +549,22 @@ public static ByteBuf newProducer(String topic, long producerId, long requestId,
}

private static PulsarApi.Schema.Type getSchemaType(SchemaType type) {
switch (type) {
case NONE:
return PulsarApi.Schema.Type.None;
case STRING:
return PulsarApi.Schema.Type.String;
case JSON:
return PulsarApi.Schema.Type.Json;
case PROTOBUF:
return PulsarApi.Schema.Type.Protobuf;
case AVRO:
return PulsarApi.Schema.Type.Avro;
default:
return PulsarApi.Schema.Type.None;
if (type.getValue() < 0) {
return Schema.Type.None;
} else {
return Schema.Type.valueOf(type.getValue());
}
}

public static SchemaType getSchemaType(PulsarApi.Schema.Type type) {
switch (type) {
case None:
return SchemaType.NONE;
case String:
return SchemaType.STRING;
case Json:
return SchemaType.JSON;
case Protobuf:
return SchemaType.PROTOBUF;
case Avro:
return SchemaType.AVRO;
default:
return SchemaType.NONE;
if (type.getNumber() < 0) {
// this is unexpected
return SchemaType.NONE;
} else {
return SchemaType.valueOf(type.getNumber());
}
}


private static PulsarApi.Schema getSchema(SchemaInfo schemaInfo) {
PulsarApi.Schema.Builder builder = PulsarApi.Schema.newBuilder()
.setName(schemaInfo.getName())
Expand Down
Loading

0 comments on commit 7e7175d

Please sign in to comment.