Skip to content

Commit

Permalink
[schema] Introduce multi version generic record schema (apache#3670)
Browse files Browse the repository at this point in the history
*Motivation*

Currently AUTO_CONSUME only supports decoding records from latest schema.
All the schema versions are lost. It makes AUTO_CONSUME less useful in some use cases,
such as CDC. Because there is no way for the applications to know which version of schema
that a message is using.

In order to support multi-version schema, we need to propagate schema version from
message header through schema#decode method to the decoded record.

*Modifications*

- Introduce a new decode method `decode(byte[] data, byte[] schemaVersion)`. This allows the implementation
  to leverage the schema version.
- Introduce a method `supportSchemaVersioning` to tell which decode methods to use. Because most of the schema
  implementations such as primitive schemas and POJO based schema doesn't make any sense to use schema version.
- Introduce a SchemaProvider which returns a specific schema instance for a given schema version
- Implement a MultiVersionGenericRecordSchema which decode the messages based on schema version. All the records
  decoded by this schema will have schema version and its corresponding schema definitions.

*NOTES

This implementation only introduce the mechanism. But it doesn't wire the multi-version schema
with auto_consume schema. There will be a subsequent pull request on implementing a schema provider
that fetches and caches schemas from brokers.
  • Loading branch information
sijie authored Feb 26, 2019
1 parent 0aa249a commit 3c36705
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,48 @@ default void validate(byte[] message) {
*/
byte[] encode(T message);

/**
* Returns whether this schema supports versioning.
*
* <p>Most of the schema implementations don't really support schema versioning, or it just doesn't
* make any sense to support schema versionings (e.g. primitive schemas). Only schema returns
* {@link GenericRecord} should support schema versioning.
*
* <p>If a schema implementation returns <tt>false</tt>, it should implement {@link #decode(byte[])};
* while a schema implementation returns <tt>true</tt>, it should implement {@link #decode(byte[], byte[])}
* instead.
*
* @return true if this schema implementation supports schema versioning; otherwise returns false.
*/
default boolean supportSchemaVersioning() {
return false;
}

/**
* Decode a byte array into an object using the schema definition and deserializer implementation
*
* @param bytes
* the byte array to decode
* @return the deserialized object
*/
T decode(byte[] bytes);
default T decode(byte[] bytes) {
// use `null` to indicate ignoring schema version
return decode(bytes, null);
}

/**
* Decode a byte array into an object using a given version.
*
* @param bytes
* the byte array to decode
* @param schemaVersion
* the schema version to decode the object. null indicates using latest version.
* @return the deserialized object
*/
default T decode(byte[] bytes, byte[] schemaVersion) {
// ignore version by default (most of the primitive schema implementations ignore schema version)
return decode(bytes);
}

/**
* @return an object that represents the Schema associated metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
*/
public interface GenericRecord {

/**
* Return schema version.
*
* @return schema version.
*/
byte[] getSchemaVersion();

/**
* Returns the list of fields associated with the record.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,23 @@ public byte[] getData() {
}
}

private byte[] getSchemaVersion() {
if (msgMetadataBuilder.hasSchemaVersion()) {
return msgMetadataBuilder.getSchemaVersion().toByteArray();
} else {
return null;
}
}

@Override
public T getValue() {
return schema.decode(getData());
// check if the schema passed in from client supports schema versioning or not
// this is an optimization to only get schema version when necessary
if (schema.supportSchemaVersioning()) {
return schema.decode(getData(), getSchemaVersion());
} else {
return schema.decode(getData());
}
}

public long getSequenceId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ public byte[] encode(GenericRecord message) {
}

@Override
public GenericRecord decode(byte[] bytes) {
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
ensureSchemaInitialized();

return schema.decode(bytes);
return schema.decode(bytes, schemaVersion);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ public byte[] encode(byte[] message) {
}

@Override
public byte[] decode(byte[] bytes) {
public byte[] decode(byte[] bytes, byte[] schemaVersion) {
ensureSchemaInitialized();

if (requireSchemaValidation) {
// verify the message can be detected by the underlying schema
schema.decode(bytes);
schema.decode(bytes, schemaVersion);
}

return bytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,25 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.util.Utf8;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;

/**
* A generic avro record.
*/
@Slf4j
class GenericAvroRecord implements GenericRecord {
class GenericAvroRecord extends VersionedGenericRecord {

private final org.apache.avro.Schema schema;
private final List<Field> fields;
private final org.apache.avro.generic.GenericRecord record;

GenericAvroRecord(org.apache.avro.Schema schema,
GenericAvroRecord(byte[] schemaVersion,
org.apache.avro.Schema schema,
List<Field> fields,
org.apache.avro.generic.GenericRecord record) {
super(schemaVersion, fields);
this.schema = schema;
this.fields = fields;
this.record = record;
}

@Override
public List<Field> getFields() {
return fields;
}

@Override
public Object getField(String fieldName) {
Object value = record.get(fieldName);
Expand All @@ -61,7 +55,7 @@ public Object getField(String fieldName) {
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList());
return new GenericAvroRecord(schema, fields, avroRecord);
return new GenericAvroRecord(schemaVersion, schema, fields, avroRecord);
} else {
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ public synchronized byte[] encode(GenericRecord message) {
}

@Override
public GenericRecord decode(byte[] bytes) {
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
try {
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
org.apache.avro.generic.GenericRecord avroRecord = datumReader.read(
null,
decoder);
return new GenericAvroRecord(schema, fields, avroRecord);
return new GenericAvroRecord(schemaVersion, schema, fields, avroRecord);
} catch (IOException e) {
throw new SchemaSerializationException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,25 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;

/**
* Generic json record.
*/
class GenericJsonRecord implements GenericRecord {
class GenericJsonRecord extends VersionedGenericRecord {

private final List<Field> fields;
private final JsonNode jn;

GenericJsonRecord(List<Field> fields,
GenericJsonRecord(byte[] schemaVersion,
List<Field> fields,
JsonNode jn) {
this.fields = fields;
super(schemaVersion, fields);
this.jn = jn;
}

JsonNode getJsonNode() {
return jn;
}

@Override
public List<Field> getFields() {
return fields;
}

@Override
public Object getField(String fieldName) {
JsonNode fn = jn.get(fieldName);
Expand All @@ -58,7 +52,7 @@ public Object getField(String fieldName) {
.stream()
.map(f -> new Field(f, idx.getAndIncrement()))
.collect(Collectors.toList());
return new GenericJsonRecord(fields, fn);
return new GenericJsonRecord(schemaVersion, fields, fn);
} else if (fn.isBoolean()) {
return fn.asBoolean();
} else if (fn.isInt()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ public byte[] encode(GenericRecord message) {
}

@Override
public GenericRecord decode(byte[] bytes) {
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
try {
JsonNode jn = objectMapper.readTree(new String(bytes, UTF_8));
return new GenericJsonRecord(fields, jn);
return new GenericJsonRecord(schemaVersion, fields, jn);
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.generic;

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.common.schema.SchemaInfo;

/**
* A schema implementation that handles schema versioning.
*/
public class MultiVersionGenericSchema implements Schema<GenericRecord> {

private final SchemaProvider<GenericRecord> provider;

MultiVersionGenericSchema(SchemaProvider<GenericRecord> provider) {
this.provider = provider;
}

@Override
public byte[] encode(GenericRecord message) {
throw new UnsupportedOperationException("This schema implementation is only used for AUTO_CONSUME");
}

@Override
public boolean supportSchemaVersioning() {
return true;
}

@Override
public GenericRecord decode(byte[] bytes) {
return provider.getSchema(null).decode(bytes);
}

@Override
public GenericRecord decode(byte[] bytes, byte[] schemaVersion) {
return provider.getSchema(schemaVersion).decode(bytes, schemaVersion);
}

@Override
public SchemaInfo getSchemaInfo() {
// simulate it is a bytes schema
return BytesSchema.of().getSchemaInfo();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.generic;

import org.apache.pulsar.client.api.Schema;

/**
* Schema Provider.
*/
public interface SchemaProvider<T> {

/**
* Retrieve the schema instance of a given <tt>schemaVersion</tt>.
*
* @param schemaVersion schema version
* @return schema instance of the provided <tt>schemaVersion</tt>
*/
Schema<T> getSchema(byte[] schemaVersion);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.schema.generic;

import java.util.List;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;

/**
* A generic record carrying schema version.
*/
abstract class VersionedGenericRecord implements GenericRecord {

protected final byte[] schemaVersion;
protected final List<Field> fields;

protected VersionedGenericRecord(byte[] schemaVersion,
List<Field> fields) {
this.schemaVersion = schemaVersion;
this.fields = fields;
}

@Override
public byte[] getSchemaVersion() {
return schemaVersion;
}

@Override
public List<Field> getFields() {
return fields;
}

}
Loading

0 comments on commit 3c36705

Please sign in to comment.