Skip to content

Commit

Permalink
Add Schema.getNativeSchema (apache#10076)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Apr 1, 2021
1 parent cdba1c0 commit 4128151
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.avro.reflect.ReflectData;
import org.apache.avro.Schema.Parser;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.schema.LongSchemaVersion;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
Expand Down Expand Up @@ -523,7 +524,12 @@ public void testAutoConsume(boolean batching) throws Exception {
Message<GenericRecord> data = c.receive();
assertNotNull(data.getSchemaVersion());
assertEquals(data.getValue().getField("i"), i);
MessageImpl impl = (MessageImpl) data;

org.apache.avro.Schema avroSchema = (org.apache.avro.Schema) impl.getSchema().getNativeSchema().get();
assertNotNull(avroSchema);
}

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Date;
import java.util.Optional;

import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
Expand Down Expand Up @@ -156,6 +158,15 @@ default void configureSchemaInfo(String topic, String componentName,
*/
Schema<byte[]> BYTES = DefaultImplementation.newBytesSchema();

/**
* Return the native schema that is wrapped by Pulsar API.
* For instance with this method you can access the Avro schema
* @return the internal schema or null if not present
*/
default Optional<Object> getNativeSchema() {
return Optional.empty();
}

/**
* ByteBuffer Schema.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -147,6 +148,16 @@ public Schema<GenericRecord> clone() {
return schema;
}

@Override
public Optional<Object> getNativeSchema() {
ensureSchemaInitialized();
if (schema == null) {
return Optional.empty();
} else {
return schema.getNativeSchema();
}
}

private GenericSchema generateSchema(SchemaInfo schemaInfo) {
// when using `AutoConsumeSchema`, we use the schema associated with the messages as schema reader
// to decode the messages.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.avro.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;

import java.util.Optional;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.client.impl.schema.util.SchemaUtil.parseAvroSchema;

Expand All @@ -46,4 +48,9 @@ public AvroBaseStructSchema(SchemaInfo schemaInfo) {
public Schema getAvroSchema(){
return schema;
}

@Override
public Optional<Object> getNativeSchema() {
return Optional.of(schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -100,6 +101,11 @@ public Descriptors.Descriptor getProtobufNativeSchema() {
return ProtobufNativeSchemaUtils.deserialize(this.schemaInfo.getSchema());
}

@Override
public Optional<Object> getNativeSchema() {
return Optional.of(getProtobufNativeSchema());
}

public static <T extends GeneratedMessageV3> ProtobufNativeSchema<T> of(Class<T> pojo) {
return of(pojo, new HashMap<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertSame;

import java.math.BigDecimal;
import java.time.Instant;
Expand Down Expand Up @@ -125,11 +126,17 @@ private static class JodaTimeLogicalType{
long timeMicros;
}

@Test
public void testGetNativeSchema() throws SchemaValidationException {
AvroSchema<StructWithAnnotations> schema2 = AvroSchema.of(StructWithAnnotations.class);
org.apache.avro.Schema avroSchema2 = (Schema) schema2.getNativeSchema().get();
assertSame(schema2.schema, avroSchema2);
}

@Test
public void testSchemaDefinition() throws SchemaValidationException {
org.apache.avro.Schema schema1 = ReflectData.get().getSchema(DefaultStruct.class);
AvroSchema<StructWithAnnotations> schema2 = AvroSchema.of(StructWithAnnotations.class);

String schemaDef1 = schema1.toString();
String schemaDef2 = new String(schema2.getSchemaInfo().getSchema(), UTF_8);
assertNotEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_ALLOW_NULL;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_NOT_ALLOW_NULL;
import static org.testng.Assert.assertEquals;
import static org.testng.AssertJUnit.assertSame;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
Expand All @@ -32,6 +34,7 @@
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.SchemaValidationException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.Bar;
Expand Down Expand Up @@ -368,4 +371,11 @@ public void testEncodeAndDecodeObject() throws JsonProcessingException {
PC roundtrippedPc = jsonSchema.decode(encoded);
assertEquals(roundtrippedPc, pc);
}

@Test
public void testGetNativeSchema() throws SchemaValidationException {
JSONSchema<PC> schema2 = JSONSchema.of(PC.class);
org.apache.avro.Schema avroSchema2 = (Schema) schema2.getNativeSchema().get();
assertSame(schema2.schema, avroSchema2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl.schema;

import com.google.protobuf.Descriptors;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -29,6 +30,10 @@
import java.util.Collections;
import java.util.HashMap;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.AssertJUnit.assertSame;

@Slf4j
public class ProtobufNativeSchemaTest {

Expand All @@ -51,18 +56,18 @@ public void testEncodeAndDecode() {
byte[] bytes = protobufSchema.encode(testMessage);
org.apache.pulsar.client.schema.proto.Test.TestMessage message = protobufSchema.decode(bytes);

Assert.assertEquals(message.getStringField(), stringFieldValue);
assertEquals(message.getStringField(), stringFieldValue);
}

@Test
public void testSchema() {
ProtobufNativeSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
= ProtobufNativeSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);

Assert.assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE);
assertEquals(protobufSchema.getSchemaInfo().getType(), SchemaType.PROTOBUF_NATIVE);

Assert.assertNotNull(ProtobufNativeSchemaUtils.deserialize(protobufSchema.getSchemaInfo().getSchema()));
Assert.assertEquals(new String(protobufSchema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8), EXPECTED_SCHEMA_JSON);
assertNotNull(ProtobufNativeSchemaUtils.deserialize(protobufSchema.getSchemaInfo().getSchema()));
assertEquals(new String(protobufSchema.getSchemaInfo().getSchema(), StandardCharsets.UTF_8), EXPECTED_SCHEMA_JSON);
}

@Test
Expand Down Expand Up @@ -96,8 +101,16 @@ public void testDecodeByteBuf() {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(bytes.length);
byteBuf.writeBytes(bytes);

Assert.assertEquals(testMessage, protobufSchema.decode(byteBuf));
assertEquals(testMessage, protobufSchema.decode(byteBuf));

}

@Test
public void testGetNativeSchema() {
ProtobufNativeSchema<org.apache.pulsar.client.schema.proto.Test.TestMessage> protobufSchema
= ProtobufNativeSchema.of(org.apache.pulsar.client.schema.proto.Test.TestMessage.class);
Descriptors.Descriptor nativeSchema = (Descriptors.Descriptor) protobufSchema.getNativeSchema().get();
assertNotNull(nativeSchema);
}

}

0 comments on commit 4128151

Please sign in to comment.