Skip to content

Commit

Permalink
fix json deserialize byte to string bug (apache#8140)
Browse files Browse the repository at this point in the history
Fix apache#7657 

### Motivation
In `GenericJsonRecord.java`, it deserialize byte to String.
```
public Object getField(String fieldName) {
        JsonNode fn = jn.get(fieldName);
        if (fn.isContainerNode()) {
            AtomicInteger idx = new AtomicInteger(0);
            List<Field> fields = Lists.newArrayList(fn.fieldNames())
                .stream()
                .map(f -> new Field(f, idx.getAndIncrement()))
                .collect(Collectors.toList());
            return new GenericJsonRecord(schemaVersion, fields, fn);
        } else if (fn.isBoolean()) {
            return fn.asBoolean();
        } else if (fn.isFloatingPointNumber()) {
            return fn.asDouble();
        } else if (fn.isBigInteger()) {
            if (fn.canConvertToLong()) {
                return fn.asLong();
            } else {
                return fn.asText();
            }
        } else if (fn.isNumber()) {
            return fn.numberValue();
        } else {
            return fn.asText();
        }
    }
```
### Changes
Add check the jsonNode binary type and convert to binaryValue instead of `asText`.
  • Loading branch information
hangc0276 authored Oct 5, 2020
1 parent b1dfd42 commit 66c3733
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.pulsar.schema;

import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -38,6 +41,7 @@
import static org.apache.pulsar.schema.compatibility.SchemaCompatibilityCheckTest.randomName;
import static org.junit.Assert.assertEquals;

@Slf4j
public class SchemaTest extends MockedPulsarServiceBaseTest {

private final static String CLUSTER_NAME = "test";
Expand Down Expand Up @@ -133,4 +137,56 @@ public void testMultiTopicSetSchemaProvider() throws Exception {
producer.close();
consumer.close();
}

@Test
public void testBytesSchemaDeserialize() throws Exception {
final String tenant = PUBLIC_TENANT;
final String namespace = "test-namespace-" + randomName(16);
final String topicName = "test-bytes-schema";

final String topic = TopicName.get(
TopicDomain.persistent.value(),
tenant,
namespace,
topicName).toString();

admin.namespaces().createNamespace(
tenant + "/" + namespace,
Sets.newHashSet(CLUSTER_NAME));

admin.topics().createPartitionedTopic(topic, 2);
admin.schemas().createSchema(topic, Schema.JSON(Schemas.BytesRecord.class).getSchemaInfo());

Producer<Schemas.BytesRecord> producer = pulsarClient
.newProducer(Schema.JSON(Schemas.BytesRecord.class))
.topic(topic)
.create();

Schemas.BytesRecord bytesRecord = new Schemas.BytesRecord();
bytesRecord.setId(1);
bytesRecord.setName("Tom");
bytesRecord.setAddress("test".getBytes());

Consumer<GenericRecord> consumer = pulsarClient.newConsumer(Schema.AUTO_CONSUME())
.subscriptionName("test-sub")
.topic(topic)
.subscribe();

Consumer<Schemas.BytesRecord> consumer1 = pulsarClient.newConsumer(Schema.JSON(Schemas.BytesRecord.class))
.subscriptionName("test-sub1")
.topic(topic)
.subscribe();

producer.send(bytesRecord);

Message<GenericRecord> message = consumer.receive();
Message<Schemas.BytesRecord> message1 = consumer1.receive();

assertEquals(message.getValue().getField("address").getClass(),
message1.getValue().getAddress().getClass());

producer.close();
consumer.close();
consumer1.close();
}
}
17 changes: 13 additions & 4 deletions pulsar-broker/src/test/java/org/apache/pulsar/schema/Schemas.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ public class Schemas {
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class PersonOne{
public static class PersonOne {
int id;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public static class PersonTwo{
public static class PersonTwo {
int id;

@AvroDefault("\"Tom\"")
Expand All @@ -45,7 +45,7 @@ public static class PersonTwo{
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class PersonThree{
public static class PersonThree {
int id;

String name;
Expand All @@ -54,11 +54,20 @@ public static class PersonThree{
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class PersonFour{
public static class PersonFour {
int id;

String name;

int age;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public static class BytesRecord {
int id;
String name;
byte[] address;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaReader;

import org.apache.pulsar.common.schema.SchemaInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,22 +40,35 @@ public class GenericJsonReader implements SchemaReader<GenericRecord> {
private final ObjectMapper objectMapper;
private final byte[] schemaVersion;
private final List<Field> fields;
public GenericJsonReader(List<Field> fields){
private SchemaInfo schemaInfo;

public GenericJsonReader(List<Field> fields, SchemaInfo schemaInfo){
this.fields = fields;
this.schemaVersion = null;
this.objectMapper = new ObjectMapper();
this.schemaInfo = schemaInfo;
}

public GenericJsonReader(List<Field> fields){
this(fields, null);
}

public GenericJsonReader(byte[] schemaVersion, List<Field> fields){
this(schemaVersion, fields, null);
}

public GenericJsonReader(byte[] schemaVersion, List<Field> fields, SchemaInfo schemaInfo){
this.objectMapper = new ObjectMapper();
this.fields = fields;
this.schemaVersion = schemaVersion;
this.schemaInfo = schemaInfo;
}

@Override
public GenericJsonRecord read(byte[] bytes, int offset, int length) {
try {
JsonNode jn = objectMapper.readTree(new String(bytes, offset, length, UTF_8));
return new GenericJsonRecord(schemaVersion, fields, jn);
return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
}
Expand All @@ -64,7 +78,7 @@ public GenericJsonRecord read(byte[] bytes, int offset, int length) {
public GenericRecord read(InputStream inputStream) {
try {
JsonNode jn = objectMapper.readTree(inputStream);
return new GenericJsonRecord(schemaVersion, fields, jn);
return new GenericJsonRecord(schemaVersion, fields, jn, schemaInfo);
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,37 @@
package org.apache.pulsar.client.impl.schema.generic;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.common.schema.SchemaInfo;

/**
* Generic json record.
*/
@Slf4j
public class GenericJsonRecord extends VersionedGenericRecord {

private final JsonNode jn;
private final SchemaInfo schemaInfo;

GenericJsonRecord(byte[] schemaVersion,
List<Field> fields,
JsonNode jn) {
this(schemaVersion, fields, jn, null);
}

GenericJsonRecord(byte[] schemaVersion,
List<Field> fields,
JsonNode jn, SchemaInfo schemaInfo) {
super(schemaVersion, fields);
this.jn = jn;
this.schemaInfo = schemaInfo;
}

public JsonNode getJsonNode() {
Expand All @@ -52,7 +65,7 @@ public Object getField(String fieldName) {
.stream()
.map(f -> new Field(f, idx.getAndIncrement()))
.collect(Collectors.toList());
return new GenericJsonRecord(schemaVersion, fields, fn);
return new GenericJsonRecord(schemaVersion, fields, fn, schemaInfo);
} else if (fn.isBoolean()) {
return fn.asBoolean();
} else if (fn.isFloatingPointNumber()) {
Expand All @@ -65,8 +78,53 @@ public Object getField(String fieldName) {
}
} else if (fn.isNumber()) {
return fn.numberValue();
} else if (fn.isBinary()) {
try {
return fn.binaryValue();
} catch (IOException e) {
return fn.asText();
}
} else if (isBinaryValue(fieldName)) {
try {
return fn.binaryValue();
} catch (IOException e) {
return fn.asText();
}
} else {
return fn.asText();
}
}

private boolean isBinaryValue(String fieldName) {
boolean isBinary = false;

do {
if (schemaInfo == null) {
break;
}

try {
org.apache.avro.Schema schema = parseAvroSchema(schemaInfo.getSchemaDefinition());
org.apache.avro.Schema.Field field = schema.getField(fieldName);
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(field.schema().toString());
for (JsonNode node : jsonNode) {
JsonNode jn = node.get("type");
if (jn != null && ("bytes".equals(jn.asText()) || "byte".equals(jn.asText()))) {
isBinary = true;
}
}
} catch (Exception e) {
log.error("parse schemaInfo failed. ", e);
}
} while (false);

return isBinary;
}

private org.apache.avro.Schema parseAvroSchema(String schemaJson) {
final org.apache.avro.Schema.Parser parser = new org.apache.avro.Schema.Parser();
parser.setValidateDefaults(false);
return parser.parse(schemaJson);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public GenericJsonSchema(SchemaInfo schemaInfo) {
boolean useProvidedSchemaAsReaderSchema) {
super(schemaInfo, useProvidedSchemaAsReaderSchema);
setWriter(new GenericJsonWriter());
setReader(new GenericJsonReader(fields));
setReader(new GenericJsonReader(fields, schemaInfo));
}

@Override
Expand All @@ -64,7 +64,7 @@ protected SchemaReader<GenericRecord> loadReader(BytesSchemaVersion schemaVersio
readerSchema.getFields()
.stream()
.map(f -> new Field(f.name(), f.pos()))
.collect(Collectors.toList()));
.collect(Collectors.toList()), schemaInfo);
} else {
log.warn("No schema found for version({}), use latest schema : {}",
SchemaUtils.getStringSchemaVersion(schemaVersion.get()),
Expand Down

0 comments on commit 66c3733

Please sign in to comment.