Skip to content

Commit

Permalink
[schema] AutoConsume should use the schema associated with messages a…
Browse files Browse the repository at this point in the history
…s both writer and reader schema (apache#4325)

* [schema] AutoConsume should use the schema associated with messages for both writer and reader schema

*Motivation*

AutoConsume should use the schema associated with the messages for decoding the schemas.

*Modifications*

- provide a flag enable or disable using the provided schema as the reader schema
- for AUTO_CONSUME schema, disable usnig the provided schema as the reader schema. so it can use the right
  schema version for decoding messages into right generic records
- provide a few util methods for displaying schema data

* Handle 64 bytes schema version

* Addressed review comments
  • Loading branch information
sijie authored and srkukarni committed May 22, 2019
1 parent 527995b commit bf06ef3
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.common.schema;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.util.Base64;
import java.util.Collections;
import java.util.Map;

Expand Down Expand Up @@ -52,4 +55,19 @@ public class SchemaInfo {
* Additional properties of the schema definition (implementation defined)
*/
private Map<String, String> properties = Collections.emptyMap();

public String getSchemaDefinition() {
if (null == schema) {
return "";
}

switch (type) {
case AVRO:
case JSON:
case PROTOBUF:
return new String(schema, UTF_8);
default:
return Base64.getEncoder().encodeToString(schema);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -730,13 +730,16 @@ protected void preProcessSchemaBeforeSubscribe(PulsarClientImpl pulsarClientImpl

if (schema instanceof AutoConsumeSchema) {
SchemaInfo schemaInfo = schemaInfoProvider.getLatestSchema();
if (schemaInfo.getType() != SchemaType.AVRO){
if (schemaInfo.getType() != SchemaType.AVRO && schemaInfo.getType() != SchemaType.JSON){
throw new RuntimeException("Currently schema detection only works for topics with avro schemas");

}
GenericSchema genericSchema = GenericSchemaImpl.of(schemaInfoProvider.getLatestSchema());

// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
GenericSchema genericSchema = GenericSchemaImpl.of(
schemaInfoProvider.getLatestSchema(), false /*useProvidedSchemaAsReaderSchema*/);
log.info("Auto detected schema for topic {} : {}",
topicName, new String(schemaInfo.getSchema(), UTF_8));
topicName, schemaInfo.getSchemaDefinition());
((AutoConsumeSchema) schema).setSchema(genericSchema);
}
schema.setSchemaInfoProvider(schemaInfoProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,16 @@ public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties
protected SchemaReader<T> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
if (schemaInfo != null) {
return new AvroReader<>(parseAvroSchema(new String(schemaInfo.getSchema())), schema);
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
schemaInfo.getSchemaDefinition());
return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()), schema);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
this.schemaInfo.getSchemaDefinition());
return reader;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.reader.JsonReader;
import org.apache.pulsar.client.impl.schema.writer.JsonWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.avro.protobuf.ProtobufData;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.api.schema.SchemaWriter;
import org.apache.pulsar.client.impl.schema.reader.ProtobufReader;
import org.apache.pulsar.client.impl.schema.writer.ProtobufWriter;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -96,7 +95,8 @@ public void accept(Descriptors.FieldDescriptor fieldDescriptor) {

@Override
protected SchemaReader<T> loadReader(byte[] schemaVersion) {
throw new RuntimeException("ProtobufSchema don't support schema versioning"); }
throw new RuntimeException("ProtobufSchema don't support schema versioning");
}

public static <T extends com.google.protobuf.GeneratedMessageV3> ProtobufSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
import io.netty.buffer.ByteBufUtil;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.schema.SchemaVersion;

/**
* Utils for schemas.
*/
final class SchemaUtils {
public final class SchemaUtils {

private SchemaUtils() {}

Expand Down Expand Up @@ -149,4 +151,22 @@ public static Object toAvroObject(Object value) {
}
}

public static String getStringSchemaVersion(byte[] schemaVersionBytes) {
if (null == schemaVersionBytes) {
return "NULL";
} else if (
// the length of schema version is 8 bytes post 2.4.0
schemaVersionBytes.length == Long.BYTES
// the length of schema version is 64 bytes before 2.4.0
|| schemaVersionBytes.length == Long.SIZE) {
ByteBuffer bb = ByteBuffer.wrap(schemaVersionBytes);
return String.valueOf(bb.getLong());
} else if (schemaVersionBytes.length == 0) {
return "EMPTY";
} else {
return Base64.getEncoder().encodeToString(schemaVersionBytes);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public void setSchemaInfoProvider(SchemaInfoProvider schemaInfoProvider) {
this.schemaInfoProvider = schemaInfoProvider;
}

/**
* Load the schema reader for reading messages encoded by the given schema version.
*
* @param schemaVersion the provided schema version
* @return the schema reader for decoding messages encoded by the provided schema version.
*/
protected abstract SchemaReader<T> loadReader(byte[] schemaVersion);

protected void setWriter(SchemaWriter<T> writer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,27 @@
*/
package org.apache.pulsar.client.impl.schema.generic;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.schema.SchemaInfo;

/**
* A generic avro schema.
*/
@Slf4j
public class GenericAvroSchema extends GenericSchemaImpl {

public GenericAvroSchema(SchemaInfo schemaInfo) {
super(schemaInfo);
this(schemaInfo, true);
}

GenericAvroSchema(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
super(schemaInfo, useProvidedSchemaAsReaderSchema);
setReader(new GenericAvroReader(schema));
setWriter(new GenericAvroWriter(schema));
}
Expand All @@ -48,11 +57,19 @@ public boolean supportSchemaVersioning() {
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
schemaInfo.getSchemaDefinition());
Schema writerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
Schema readerSchema = useProvidedSchemaAsReaderSchema ? schema : writerSchema;
return new GenericAvroReader(
parseAvroSchema(new String(schemaInfo.getSchema())),
schema,
writerSchema,
readerSchema,
schemaVersion);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
this.schemaInfo.getSchemaDefinition());
return reader;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,28 @@

import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericRecordBuilder;
import org.apache.pulsar.client.api.schema.SchemaReader;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.schema.SchemaInfo;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* A generic json schema.
*/
@Slf4j
class GenericJsonSchema extends GenericSchemaImpl {

public GenericJsonSchema(SchemaInfo schemaInfo) {
super(schemaInfo);
this(schemaInfo, true);
}

GenericJsonSchema(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
super(schemaInfo, useProvidedSchemaAsReaderSchema);
setWriter(new GenericJsonWriter());
setReader(new GenericJsonReader(fields));
}
Expand All @@ -43,12 +50,24 @@ public GenericJsonSchema(SchemaInfo schemaInfo) {
protected SchemaReader<GenericRecord> loadReader(byte[] schemaVersion) {
SchemaInfo schemaInfo = schemaInfoProvider.getSchemaByVersion(schemaVersion);
if (schemaInfo != null) {
log.info("Load schema reader for version({}), schema is : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
schemaInfo.getSchemaDefinition());
Schema readerSchema;
if (useProvidedSchemaAsReaderSchema) {
readerSchema = schema;
} else {
readerSchema = parseAvroSchema(schemaInfo.getSchemaDefinition());
}
return new GenericJsonReader(schemaVersion,
(parseAvroSchema(new String(schemaInfo.getSchema(), UTF_8)).getFields()
readerSchema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList())));
.collect(Collectors.toList()));
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion),
this.schemaInfo.getSchemaDefinition());
return reader;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,20 @@
public abstract class GenericSchemaImpl extends StructSchema<GenericRecord> implements GenericSchema<GenericRecord> {

protected final List<Field> fields;
// the flag controls whether to use the provided schema as reader schema
// to decode the messages. In `AUTO_CONSUME` mode, setting this flag to `false`
// allows decoding the messages using the schema associated with the messages.
protected final boolean useProvidedSchemaAsReaderSchema;

protected GenericSchemaImpl(SchemaInfo schemaInfo) {
protected GenericSchemaImpl(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
super(schemaInfo);

this.fields = schema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList());
this.useProvidedSchemaAsReaderSchema = useProvidedSchemaAsReaderSchema;
}

@Override
Expand All @@ -55,11 +61,16 @@ public List<Field> getFields() {
* @return a generic schema instance
*/
public static GenericSchemaImpl of(SchemaInfo schemaInfo) {
return of(schemaInfo, true);
}

public static GenericSchemaImpl of(SchemaInfo schemaInfo,
boolean useProvidedSchemaAsReaderSchema) {
switch (schemaInfo.getType()) {
case AVRO:
return new GenericAvroSchema(schemaInfo);
return new GenericAvroSchema(schemaInfo, useProvidedSchemaAsReaderSchema);
case JSON:
return new GenericJsonSchema(schemaInfo);
return new GenericJsonSchema(schemaInfo, useProvidedSchemaAsReaderSchema);
default:
throw new UnsupportedOperationException("Generic schema is not supported on schema type "
+ schemaInfo.getType() + "'");
Expand Down
Loading

0 comments on commit bf06ef3

Please sign in to comment.