Skip to content

Commit

Permalink
Support passing schema definition for JSON and AVRO schemas (apache#3766
Browse files Browse the repository at this point in the history
)

* Support passing schema definition for JSON and AVRO schemas

*Motivation*

Currently AVRO and Schema generated schemas from POJO directly.
Sometime people would like to use pre-generated/defined schemas,
so allow passing in schema definitions would clear the confusions
on parsing schemas from POJO.

*Modifications*

- Abstract a common base class `StructSchema` for AVRO/PROTOBUF/JSON
- Standarize on using avro schema for defining schema (we already did that. this change only makes it clearer)
- Add methods to pass schema definition for JSON and AVRO schemas

*NOTES*

We don't support passing schema definition for PROTOBUF. since we only supported generated messages as POJO
class for protobuf schema, and we generate schema definition from the generated messages. it doesn't make sense
to pass in a different schema definition.

* Add missing license header
  • Loading branch information
sijie authored and jiazhai committed Mar 14, 2019
1 parent 2d44850 commit da68b23
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.client.api;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.internal.DefaultImplementation;
Expand Down Expand Up @@ -176,6 +178,27 @@ static <T> Schema<T> AVRO(Class<T> clazz) {
return DefaultImplementation.newAvroSchema(clazz);
}

/**
* Create a Avro schema type using the provided avro schema definition.
*
* @param schemaDefinition avro schema definition
* @return a Schema instance
*/
static <T> Schema<T> AVRO(String schemaDefinition) {
return AVRO(schemaDefinition, Collections.emptyMap());
}

/**
* Create a Avro schema type using the provided avro schema definition.
*
* @param schemaDefinition avro schema definition
* @param properties pulsar schema properties
* @return a Schema instance
*/
static <T> Schema<T> AVRO(String schemaDefinition, Map<String, String> properties) {
return DefaultImplementation.newAvroSchema(schemaDefinition, properties);
}

/**
* Create a JSON schema type by extracting the fields of the specified class.
*
Expand All @@ -186,6 +209,20 @@ static <T> Schema<T> JSON(Class<T> clazz) {
return DefaultImplementation.newJSONSchema(clazz);
}

/**
* Create a JSON schema type by extracting the fields of the specified class.
*
* @param clazz the POJO class to be used to extract the JSON schema
* @param schemaDefinition schema definition json string (using avro schema syntax)
* @param properties pulsar schema properties
* @return a Schema instance
*/
static <T> Schema<T> JSON(Class<T> clazz,
String schemaDefinition,
Map<String, String> properties) {
return DefaultImplementation.newJSONSchema(clazz, schemaDefinition, properties);
}

/**
* Key Value Schema using passed in schema type, support JSON and AVRO currently.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ public static <T> Schema<T> newAvroSchema(Class<T> clazz) {
.invoke(null, clazz));
}

public static <T> Schema<T> newAvroSchema(String schemaDefinition, Map<String, String> properties) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", String.class, Map.class)
.invoke(null, schemaDefinition, properties));
}

public static <T extends com.google.protobuf.GeneratedMessageV3> Schema<T> newProtobufSchema(Class<T> clazz) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.ProtobufSchema", "of", Class.class)
Expand All @@ -200,6 +206,14 @@ public static <T> Schema<T> newJSONSchema(Class<T> clazz) {
.invoke(null, clazz));
}

public static <T> Schema<T> newJSONSchema(Class<T> clazz,
String schemaDefinition,
Map<String, String> properties) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.JSONSchema", "of", Class.class, String.class, Map.class)
.invoke(null, clazz, schemaDefinition, properties));
}

public static Schema<GenericRecord> newAutoConsumeSchema() {
return catchExceptions(
() -> (Schema<GenericRecord>) newClassInstance("org.apache.pulsar.client.impl.schema.AutoConsumeSchema")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,27 @@
*/
package org.apache.pulsar.client.impl.schema;

import static java.nio.charset.StandardCharsets.UTF_8;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;

/**
* An AVRO schema implementation.
*/
@Slf4j
public class AvroSchema<T> implements Schema<T> {
public class AvroSchema<T> extends StructSchema<T> {

private SchemaInfo schemaInfo;
private org.apache.avro.Schema schema;
private ReflectDatumWriter<T> datumWriter;
private ReflectDatumReader<T> reader;
private BinaryEncoder encoder;
Expand All @@ -53,13 +49,10 @@ public class AvroSchema<T> implements Schema<T> {

private AvroSchema(org.apache.avro.Schema schema,
Map<String, String> properties) {
this.schema = schema;

this.schemaInfo = new SchemaInfo();
this.schemaInfo.setName("");
this.schemaInfo.setProperties(properties);
this.schemaInfo.setType(SchemaType.AVRO);
this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8));
super(
SchemaType.AVRO,
schema,
properties);

this.byteArrayOutputStream = new ByteArrayOutputStream();
this.encoder = EncoderFactory.get().binaryEncoder(this.byteArrayOutputStream, this.encoder);
Expand Down Expand Up @@ -94,15 +87,6 @@ public T decode(byte[] bytes) {
}
}

@Override
public SchemaInfo getSchemaInfo() {
return this.schemaInfo;
}

private static <T> org.apache.avro.Schema createAvroSchema(Class<T> pojo) {
return ReflectData.AllowNull.get().getSchema(pojo);
}

public static <T> AvroSchema<T> of(Class<T> pojo) {
return new AvroSchema<>(createAvroSchema(pojo), Collections.emptyMap());
}
Expand All @@ -111,4 +95,15 @@ public static <T> AvroSchema<T> of(Class<T> pojo, Map<String, String> properties
return new AvroSchema<>(createAvroSchema(pojo), properties);
}

/**
* Create an Avro schema based on provided schema definition.
*
* @param schemaDefinition avro schema definition
* @param properties schema properties
* @return avro schema instance
*/
public static <T> AvroSchema<T> of(String schemaDefinition, Map<String, String> properties) {
return new AvroSchema<>(parseAvroSchema(schemaDefinition), properties);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@
*/
package org.apache.pulsar.client.impl.schema;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.jsonSchema.JsonSchema;
import com.fasterxml.jackson.module.jsonSchema.JsonSchemaGenerator;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.reflect.ReflectData;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -37,13 +33,11 @@
import java.util.Collections;
import java.util.Map;

/**
* A schema implementation to deal with json data.
*/
@Slf4j
public class JSONSchema<T> implements Schema<T>{

private final org.apache.avro.Schema schema;
private final SchemaInfo schemaInfo;
private final Class<T> pojo;
private Map<String, String> properties;
public class JSONSchema<T> extends StructSchema<T> {

// Cannot use org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal() because it does not
// return shaded version of object mapper
Expand All @@ -54,18 +48,17 @@ public class JSONSchema<T> implements Schema<T>{
return mapper;
});

private final Class<T> pojo;
private final ObjectMapper objectMapper;

private JSONSchema(Class<T> pojo, Map<String, String> properties) {
private JSONSchema(Class<T> pojo,
org.apache.avro.Schema schema,
Map<String, String> properties) {
super(
SchemaType.JSON,
schema,
properties);
this.pojo = pojo;
this.properties = properties;

this.schema = ReflectData.AllowNull.get().getSchema(pojo);
this.schemaInfo = new SchemaInfo();
this.schemaInfo.setName("");
this.schemaInfo.setProperties(properties);
this.schemaInfo.setType(SchemaType.JSON);
this.schemaInfo.setSchema(this.schema.toString().getBytes(UTF_8));
this.objectMapper = JSON_MAPPER.get();
}

Expand Down Expand Up @@ -106,7 +99,7 @@ public SchemaInfo getBackwardsCompatibleJsonSchemaInfo() {
JsonSchema jsonBackwardsCompatibleSchema = schemaGen.generateSchema(pojo);
backwardsCompatibleSchemaInfo = new SchemaInfo();
backwardsCompatibleSchemaInfo.setName("");
backwardsCompatibleSchemaInfo.setProperties(properties);
backwardsCompatibleSchemaInfo.setProperties(schemaInfo.getProperties());
backwardsCompatibleSchemaInfo.setType(SchemaType.JSON);
backwardsCompatibleSchemaInfo.setSchema(objectMapper.writeValueAsBytes(jsonBackwardsCompatibleSchema));
} catch (JsonProcessingException ex) {
Expand All @@ -116,10 +109,24 @@ public SchemaInfo getBackwardsCompatibleJsonSchemaInfo() {
}

public static <T> JSONSchema<T> of(Class<T> pojo) {
return new JSONSchema<>(pojo, Collections.emptyMap());
return new JSONSchema<>(pojo, createAvroSchema(pojo), Collections.emptyMap());
}

public static <T> JSONSchema<T> of(Class<T> pojo, Map<String, String> properties) {
return new JSONSchema<>(pojo, properties);
return new JSONSchema<>(pojo, createAvroSchema(pojo), properties);
}

/**
* Create an json schema based on provided schema definition.
*
* @param pojo pojo class
* @param schemaDefinition avro schema definition
* @param properties schema properties
* @return avro schema instance
*/
public static <T> JSONSchema<T> of(Class<T> pojo,
String schemaDefinition,
Map<String, String> properties) {
return new JSONSchema<>(pojo, parseAvroSchema(schemaDefinition), properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
*/
package org.apache.pulsar.client.impl.schema;

import static java.nio.charset.StandardCharsets.UTF_8;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Parser;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.apache.avro.protobuf.ProtobufDatumReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -40,9 +37,11 @@
import java.util.Map;
import java.util.function.Consumer;

public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> implements Schema<T> {
/**
* A schema implementation to deal with protobuf generated messages.
*/
public class ProtobufSchema<T extends com.google.protobuf.GeneratedMessageV3> extends StructSchema<T> {

private SchemaInfo schemaInfo;
private Parser<T> tParser;
public static final String PARSING_INFO_PROPERTY = "__PARSING_INFO__";

Expand All @@ -57,25 +56,27 @@ public static class ProtoBufParsingInfo {
private final Map <String, Object> definition;
}

private static <T> org.apache.avro.Schema createProtobufAvroSchema(Class<T> pojo) {
ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
return datumReader.getSchema();
}

private ProtobufSchema(Map<String, String> properties, Class<T> pojo) {
super(
SchemaType.PROTOBUF,
createProtobufAvroSchema(pojo),
properties);
// update properties with protobuf related properties
try {
T protoMessageInstance = (T) pojo.getMethod("getDefaultInstance").invoke(null);
tParser = (Parser<T>) protoMessageInstance.getParserForType();

this.schemaInfo = new SchemaInfo();
this.schemaInfo.setName("");

Map<String, String> allProperties = new HashMap<>();
allProperties.putAll(properties);
allProperties.putAll(schemaInfo.getProperties());
// set protobuf parsing info
allProperties.put(PARSING_INFO_PROPERTY, getParsingInfo(protoMessageInstance));

this.schemaInfo.setProperties(allProperties);
this.schemaInfo.setType(SchemaType.PROTOBUF);
ProtobufDatumReader datumReader = new ProtobufDatumReader(pojo);
org.apache.avro.Schema schema = datumReader.getSchema();
this.schemaInfo.setSchema(schema.toString().getBytes(UTF_8));

schemaInfo.setProperties(allProperties);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new IllegalArgumentException(e);
}
Expand Down
Loading

0 comments on commit da68b23

Please sign in to comment.