diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java index eba8ec074411c..fb241a259e9ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java @@ -19,10 +19,13 @@ package org.apache.pulsar.schema; import com.google.common.collect.Sets; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaDefinition; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -38,6 +41,7 @@ import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName; import static org.junit.Assert.assertEquals; +@Slf4j public class SchemaTest extends MockedPulsarServiceBaseTest { private final static String CLUSTER_NAME = "test"; @@ -133,4 +137,56 @@ public void testMultiTopicSetSchemaProvider() throws Exception { producer.close(); consumer.close(); } + + @Test + public void testBytesSchemaDeserialize() throws Exception { + final String tenant = PUBLIC_TENANT; + final String namespace = "test-namespace-" + randomName(16); + final String topicName = "test-bytes-schema"; + + final String topic = TopicName.get( + TopicDomain.persistent.value(), + tenant, + namespace, + topicName).toString(); + + admin.namespaces().createNamespace( + tenant + "/" + namespace, + Sets.newHashSet(CLUSTER_NAME)); + + admin.topics().createPartitionedTopic(topic, 2); + admin.schemas().createSchema(topic, Schema.JSON(Schemas.BytesRecord.class).getSchemaInfo()); + + Producer producer = pulsarClient + .newProducer(Schema.JSON(Schemas.BytesRecord.class)) + .topic(topic) + .create(); + + Schemas.BytesRecord bytesRecord = new Schemas.BytesRecord(); + bytesRecord.setId(1); + bytesRecord.setName("Tom"); + bytesRecord.setAddress("test".getBytes()); + + Consumer consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME()) + .subscriptionName("test-sub") + .topic(topic) + .subscribe(); + + Consumer consumer1 = pulsarClient.newConsumer(Schema.JSON(Schemas.BytesRecord.class)) + .subscriptionName("test-sub1") + .topic(topic) + .subscribe(); + + producer.send(bytesRecord); + + Message message = consumer.receive(); + Message message1 = consumer1.receive(); + + assertEquals(message.getValue().getField("address").getClass(), + message1.getValue().getAddress().getClass()); + + producer.close(); + consumer.close(); + consumer1.close(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java index df02574bf8afc..101ab9ca1f434 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java @@ -28,14 +28,14 @@ public class Schemas { @Data @NoArgsConstructor @AllArgsConstructor - public static class PersonOne{ + public static class PersonOne { int id; } @Data @AllArgsConstructor @NoArgsConstructor - public static class PersonTwo{ + public static class PersonTwo { int id; @AvroDefault("\"Tom\"") @@ -45,7 +45,7 @@ public static class PersonTwo{ @Data @AllArgsConstructor @NoArgsConstructor - public static class PersonThree{ + public static class PersonThree { int id; String name; @@ -54,11 +54,20 @@ public static class PersonThree{ @Data @AllArgsConstructor @NoArgsConstructor - public static class PersonFour{ + public static class PersonFour { int id; String name; int age; } + + @Data + @AllArgsConstructor + @NoArgsConstructor + public static class BytesRecord { + int id; + String name; + byte[] address; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java index 8abfc02ef165e..f0b2c86508b3b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java @@ -25,6 +25,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.SchemaReader; +import org.apache.pulsar.common.schema.SchemaInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,22 +40,35 @@ public class GenericJsonReader implements SchemaReader { private final ObjectMapper objectMapper; private final byte[] schemaVersion; private final List fields; - public GenericJsonReader(List fields){ + private SchemaInfo schemaInfo; + + public GenericJsonReader(List fields, SchemaInfo schemaInfo){ this.fields = fields; this.schemaVersion = null; this.objectMapper = new ObjectMapper(); + this.schemaInfo = schemaInfo; + } + + public GenericJsonReader(List fields){ + this(fields, null); } public GenericJsonReader(byte[] schemaVersion, List fields){ + this(schemaVersion, fields, null); + } + + public GenericJsonReader(byte[] schemaVersion, List fields, SchemaInfo schemaInfo){ this.objectMapper = new ObjectMapper(); this.fields = fields; this.schemaVersion = schemaVersion; + this.schemaInfo = schemaInfo; } + @Override public GenericJsonRecord read(byte[] bytes, int offset, int length) { try { JsonNode jn = objectMapper.readTree(new String(bytes, offset, length, UTF_8)); - return new GenericJsonRecord(schemaVersion, fields, jn); + return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo); } catch (IOException ioe) { throw new SchemaSerializationException(ioe); } @@ -64,7 +78,7 @@ public GenericJsonRecord read(byte[] bytes, int offset, int length) { public GenericRecord read(InputStream inputStream) { try { JsonNode jn = objectMapper.readTree(inputStream); - return new GenericJsonRecord(schemaVersion, fields, jn); + return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo); } catch (IOException ioe) { throw new SchemaSerializationException(ioe); } finally { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java index ae8de8c0dde6f..708b10e3dfd09 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonRecord.java @@ -19,24 +19,37 @@ package org.apache.pulsar.client.impl.schema.generic; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.schema.Field; +import org.apache.pulsar.common.schema.SchemaInfo; /** * Generic json record. */ +@Slf4j public class GenericJsonRecord extends VersionedGenericRecord { private final JsonNode jn; + private final SchemaInfo schemaInfo; GenericJsonRecord(byte[] schemaVersion, List fields, JsonNode jn) { + this(schemaVersion, fields, jn, null); + } + + GenericJsonRecord(byte[] schemaVersion, + List fields, + JsonNode jn, SchemaInfo schemaInfo) { super(schemaVersion, fields); this.jn = jn; + this.schemaInfo = schemaInfo; } public JsonNode getJsonNode() { @@ -52,7 +65,7 @@ public Object getField(String fieldName) { .stream() .map(f -> new Field(f, idx.getAndIncrement())) .collect(Collectors.toList()); - return new GenericJsonRecord(schemaVersion, fields, fn); + return new GenericJsonRecord(schemaVersion, fields, fn, schemaInfo); } else if (fn.isBoolean()) { return fn.asBoolean(); } else if (fn.isFloatingPointNumber()) { @@ -65,8 +78,53 @@ public Object getField(String fieldName) { } } else if (fn.isNumber()) { return fn.numberValue(); + } else if (fn.isBinary()) { + try { + return fn.binaryValue(); + } catch (IOException e) { + return fn.asText(); + } + } else if (isBinaryValue(fieldName)) { + try { + return fn.binaryValue(); + } catch (IOException e) { + return fn.asText(); + } } else { return fn.asText(); } } + + private boolean isBinaryValue(String fieldName) { + boolean isBinary = false; + + do { + if (schemaInfo == null) { + break; + } + + try { + org.apache.avro.Schema schema = parseAvroSchema(schemaInfo.getSchemaDefinition()); + org.apache.avro.Schema.Field field = schema.getField(fieldName); + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode jsonNode = objectMapper.readTree(field.schema().toString()); + for (JsonNode node : jsonNode) { + JsonNode jn = node.get("type"); + if (jn != null && ("bytes".equals(jn.asText()) || "byte".equals(jn.asText()))) { + isBinary = true; + } + } + } catch (Exception e) { + log.error("parse schemaInfo failed. ", e); + } + } while (false); + + return isBinary; + } + + private org.apache.avro.Schema parseAvroSchema(String schemaJson) { + final org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser(); + parser.setValidateDefaults(false); + return parser.parse(schemaJson); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java index 7bde6708526aa..2f4db38d6dcfd 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonSchema.java @@ -44,7 +44,7 @@ public GenericJsonSchema(SchemaInfo schemaInfo) { boolean useProvidedSchemaAsReaderSchema) { super(schemaInfo, useProvidedSchemaAsReaderSchema); setWriter(new GenericJsonWriter()); - setReader(new GenericJsonReader(fields)); + setReader(new GenericJsonReader(fields, schemaInfo)); } @Override @@ -64,7 +64,7 @@ protected SchemaReader loadReader(BytesSchemaVersion schemaVersio readerSchema.getFields() .stream() .map(f -> new Field(f.name(), f.pos())) - .collect(Collectors.toList())); + .collect(Collectors.toList()), schemaInfo); } else { log.warn("No schema found for version({}), use latest schema : {}", SchemaUtils.getStringSchemaVersion(schemaVersion.get()),