Skip to content

Commit

Permalink
[schema] Introduce GenericSchema interface (apache#3683)
Browse files Browse the repository at this point in the history
*Motivation*

In order to introduce `GenericRecordBuilder`, we need to know the fields in a `GenericSchema`.
Otherwise, there is no way for us to build a GenericRecord.

*Modifications*

This proposal refactors current generic schema by introducing a `GenericSchema`. This generic schema
provides interfaces to retrieve the fields of a `GenericRecordSchema`.

*Additionally*

This proposal adding the primitive schemas into `Schema` class. So people can program primitive schemas
using Schema interface rather than specific implementations.
  • Loading branch information
sijie authored Feb 28, 2019
1 parent 8c181cd commit f4d5662
Show file tree
Hide file tree
Showing 12 changed files with 203 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.pulsar.client.api;

import java.nio.ByteBuffer;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -109,11 +111,51 @@ default T decode(byte[] bytes, byte[] schemaVersion) {
*/
Schema<byte[]> BYTES = DefaultImplementation.newBytesSchema();

/**
* ByteBuffer Schema.
*/
Schema<ByteBuffer> BYTEBUFFER = DefaultImplementation.newByteBufferSchema();

/**
* Schema that can be used to encode/decode messages whose values are String. The payload is encoded with UTF-8.
*/
Schema<String> STRING = DefaultImplementation.newStringSchema();

/**
* INT8 Schema
*/
Schema<Byte> INT8 = DefaultImplementation.newByteSchema();

/**
* INT16 Schema
*/
Schema<Short> INT16 = DefaultImplementation.newShortSchema();

/**
* INT32 Schema
*/
Schema<Integer> INT32 = DefaultImplementation.newIntSchema();

/**
* INT64 Schema
*/
Schema<Long> INT64 = DefaultImplementation.newLongSchema();

/**
* Boolean Schema
*/
Schema<Boolean> BOOL = DefaultImplementation.newBooleanSchema();

/**
* Float Schema
*/
Schema<Float> FLOAT = DefaultImplementation.newFloatSchema();

/**
* Double Schema
*/
Schema<Double> DOUBLE = DefaultImplementation.newDoubleSchema();

/**
* Create a Protobuf schema type by extracting the fields of the specified class.
*
Expand Down Expand Up @@ -208,4 +250,15 @@ static Schema<?> getSchema(SchemaInfo schemaInfo) {
return DefaultImplementation.getSchema(schemaInfo);
}

/**
* Returns a generic schema of existing schema info.
*
* <p>Only supports AVRO and JSON.
*
* @param schemaInfo schema info
* @return a generic schema instance
*/
static GenericSchema generic(SchemaInfo schemaInfo) {
return DefaultImplementation.getGenericSchema(schemaInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api.schema;

import java.util.List;
import org.apache.pulsar.client.api.Schema;

/**
* A schema that serializes and deserializes between {@link GenericRecord} and bytes.
*/
public interface GenericSchema extends Schema<GenericRecord> {

/**
* Returns the list of fields.
*
* @return the list of fields of generic record.
*/
List<Field> getFields();

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.function.Supplier;

Expand All @@ -36,6 +38,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand Down Expand Up @@ -125,6 +128,60 @@ public static Schema<String> newStringSchema() {
.newInstance());
}

public static Schema<String> newStringSchema(Charset charset) {
return catchExceptions(
() -> (Schema<String>) getConstructor("org.apache.pulsar.client.impl.schema.StringSchema", Charset.class)
.newInstance(charset));
}

public static Schema<Byte> newByteSchema() {
return catchExceptions(
() -> (Schema<Byte>) newClassInstance("org.apache.pulsar.client.impl.schema.ByteSchema")
.newInstance());
}

public static Schema<Short> newShortSchema() {
return catchExceptions(
() -> (Schema<Short>) newClassInstance("org.apache.pulsar.client.impl.schema.ShortSchema")
.newInstance());
}

public static Schema<Integer> newIntSchema() {
return catchExceptions(
() -> (Schema<Integer>) newClassInstance("org.apache.pulsar.client.impl.schema.IntSchema")
.newInstance());
}

public static Schema<Long> newLongSchema() {
return catchExceptions(
() -> (Schema<Long>) newClassInstance("org.apache.pulsar.client.impl.schema.LongSchema")
.newInstance());
}

public static Schema<Boolean> newBooleanSchema() {
return catchExceptions(
() -> (Schema<Boolean>) newClassInstance("org.apache.pulsar.client.impl.schema.BooleanSchema")
.newInstance());
}

public static Schema<ByteBuffer> newByteBufferSchema() {
return catchExceptions(
() -> (Schema<ByteBuffer>) newClassInstance("org.apache.pulsar.client.impl.schema.ByteBufferSchema")
.newInstance());
}

public static Schema<Float> newFloatSchema() {
return catchExceptions(
() -> (Schema<Float>) newClassInstance("org.apache.pulsar.client.impl.schema.FloatSchema")
.newInstance());
}

public static Schema<Double> newDoubleSchema() {
return catchExceptions(
() -> (Schema<Double>) newClassInstance("org.apache.pulsar.client.impl.schema.DoubleSchema")
.newInstance());
}

public static <T> Schema<T> newAvroSchema(Class<T> clazz) {
return catchExceptions(
() -> (Schema<T>) getStaticMethod("org.apache.pulsar.client.impl.schema.AvroSchema", "of", Class.class)
Expand Down Expand Up @@ -173,6 +230,12 @@ public static Schema<?> getSchema(SchemaInfo schemaInfo) {
"getSchema", SchemaInfo.class).invoke(null, schemaInfo));
}

public static GenericSchema getGenericSchema(SchemaInfo schemaInfo) {
return catchExceptions(
() -> (GenericSchema) getStaticMethod("org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl",
"of", SchemaInfo.class).invoke(null, schemaInfo));
}

public static RecordSchemaBuilder newRecordSchemaBuilder(String name) {
return catchExceptions(
() -> (RecordSchemaBuilder) getConstructor("org.apache.pulsar.client.impl.schema.RecordSchemaBuilderImpl",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
Expand Down Expand Up @@ -296,7 +296,7 @@ private <T> CompletableFuture<Consumer<T>> singleTopicSubscribeAsync(ConsumerCon
return lookup.getSchema(TopicName.get(conf.getSingleTopic()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get());
GenericSchemaImpl genericSchema = GenericSchemaImpl.of(schemaInfoOptional.get());
log.info("Auto detected schema for topic {} : {}",
conf.getSingleTopic(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
autoConsumeSchema.setSchema(genericSchema);
Expand Down Expand Up @@ -436,7 +436,7 @@ public <T> CompletableFuture<Reader<T>> createReaderAsync(ReaderConfigurationDat
return lookup.getSchema(TopicName.get(conf.getTopicName()))
.thenCompose(schemaInfoOptional -> {
if (schemaInfoOptional.isPresent() && schemaInfoOptional.get().getType() == SchemaType.AVRO) {
GenericSchema genericSchema = GenericSchema.of(schemaInfoOptional.get());
GenericSchemaImpl genericSchema = GenericSchemaImpl.of(schemaInfoOptional.get());
log.info("Auto detected schema for topic {} : {}",
conf.getTopicName(), new String(schemaInfoOptional.get().getSchema(), UTF_8));
autoConsumeSchema.setSchema(genericSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
import org.apache.pulsar.common.schema.SchemaInfo;

/**
Expand Down Expand Up @@ -89,7 +89,7 @@ public static Schema<?> getSchema(SchemaInfo schemaInfo) {
return BytesSchema.of();
case JSON:
case AVRO:
return GenericSchema.of(schemaInfo);
return GenericSchemaImpl.of(schemaInfo);
default:
throw new IllegalArgumentException("Retrieve schema instance from schema info for type '"
+ schemaInfo.getType() + "' is not supported yet");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/**
* A generic avro schema.
*/
class GenericAvroSchema extends GenericSchema {
class GenericAvroSchema extends GenericSchemaImpl {

private final GenericDatumWriter<org.apache.avro.generic.GenericRecord> datumWriter;
private BinaryEncoder encoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* A generic json schema.
*/
class GenericJsonSchema extends GenericSchema {
class GenericJsonSchema extends GenericSchemaImpl {

private final ObjectMapper objectMapper;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.common.schema.SchemaInfo;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* A generic schema representation.
*/
public abstract class GenericSchema implements Schema<GenericRecord> {
public abstract class GenericSchemaImpl implements GenericSchema {

protected final org.apache.avro.Schema schema;
protected final List<Field> fields;
protected final SchemaInfo schemaInfo;

protected GenericSchema(SchemaInfo schemaInfo) {
protected GenericSchemaImpl(SchemaInfo schemaInfo) {
this.schemaInfo = schemaInfo;
this.schema = new org.apache.avro.Schema.Parser().parse(
new String(schemaInfo.getSchema(), UTF_8)
Expand All @@ -51,6 +52,11 @@ public org.apache.avro.Schema getAvroSchema() {
return schema;
}

@Override
public List<Field> getFields() {
return fields;
}

@Override
public SchemaInfo getSchemaInfo() {
return schemaInfo;
Expand All @@ -62,7 +68,7 @@ public SchemaInfo getSchemaInfo() {
* @param schemaInfo schema info
* @return a generic schema instance
*/
public static GenericSchema of(SchemaInfo schemaInfo) {
public static GenericSchemaImpl of(SchemaInfo schemaInfo) {
switch (schemaInfo.getType()) {
case AVRO:
return new GenericAvroSchema(schemaInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,8 @@
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.ByteBufSchema;
import org.apache.pulsar.client.impl.schema.ByteBufferSchema;
import org.apache.pulsar.client.impl.schema.ByteSchema;
import org.apache.pulsar.client.impl.schema.BytesSchema;
import org.apache.pulsar.client.impl.schema.DoubleSchema;
import org.apache.pulsar.client.impl.schema.FloatSchema;
import org.apache.pulsar.client.impl.schema.IntSchema;
import org.apache.pulsar.client.impl.schema.LongSchema;
import org.apache.pulsar.client.impl.schema.ShortSchema;
import org.apache.pulsar.client.impl.schema.StringSchema;
import org.apache.pulsar.common.schema.SchemaType;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/**
Expand All @@ -49,7 +40,7 @@
@Slf4j
public class PrimitiveSchemaTest {

final private Map<Schema, List<Object>> testData = new HashMap() {
private static final Map<Schema, List<Object>> testData = new HashMap() {
{
put(BooleanSchema.of(), Arrays.asList(false, true));
put(StringSchema.utf8(), Arrays.asList("my string"));
Expand All @@ -65,8 +56,28 @@ public class PrimitiveSchemaTest {
}
};

@Test
public void allSchemasShouldSupportNull() {
private static final Map<Schema, List<Object>> testData2 = new HashMap() {
{
put(Schema.BOOL, Arrays.asList(false, true));
put(Schema.STRING, Arrays.asList("my string"));
put(Schema.INT8, Arrays.asList((byte) 32767, (byte) -32768));
put(Schema.INT16, Arrays.asList((short) 32767, (short) -32768));
put(Schema.INT32, Arrays.asList((int) 423412424, (int) -41243432));
put(Schema.INT64, Arrays.asList(922337203685477580L, -922337203685477581L));
put(Schema.FLOAT, Arrays.asList(5678567.12312f, -5678567.12341f));
put(Schema.DOUBLE, Arrays.asList(5678567.12312d, -5678567.12341d));
put(Schema.BYTES, Arrays.asList("my string".getBytes(UTF_8)));
put(Schema.BYTEBUFFER, Arrays.asList(ByteBuffer.allocate(10).put("my string".getBytes(UTF_8))));
}
};

@DataProvider(name = "schemas")
public Object[][] schemas() {
return new Object[][] { { testData }, { testData2 } };
}

@Test(dataProvider = "schemas")
public void allSchemasShouldSupportNull(Map<Schema, List<Object>> testData) {
for (Schema<?> schema : testData.keySet()) {
assertNull(schema.encode(null),
"Should support null in " + schema.getSchemaInfo().getName() + " serialization");
Expand All @@ -75,8 +86,8 @@ public void allSchemasShouldSupportNull() {
}
}

@Test
public void allSchemasShouldRoundtripInput() {
@Test(dataProvider = "schemas")
public void allSchemasShouldRoundtripInput(Map<Schema, List<Object>> testData) {
for (Map.Entry<Schema, List<Object>> test : testData.entrySet()) {
log.info("Test schema {}", test.getKey());
for (Object value : test.getValue()) {
Expand Down
Loading

0 comments on commit f4d5662

Please sign in to comment.