Skip to content

Commit

Permalink
Fix writing/encoding of GenericJsonRecord (apache#9608)
Browse files Browse the repository at this point in the history
* Fix writing/encoding of GenericJsonRecord

- Fixes apache#9605

* Add test for JSONSchema encode/decode round-trip
  • Loading branch information
lhotari authored Feb 19, 2021
1 parent eb8a464 commit 41e118a
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
package org.apache.pulsar.client.impl.schema.generic;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.SchemaWriter;

import java.io.IOException;

public class GenericJsonWriter implements SchemaWriter<GenericRecord> {

private final ObjectMapper objectMapper;
Expand All @@ -36,7 +35,7 @@ public GenericJsonWriter() {
@Override
public byte[] write(GenericRecord message) {
try {
return objectMapper.writeValueAsBytes(((GenericJsonRecord)message).getJsonNode().toString());
return objectMapper.writeValueAsBytes(((GenericJsonRecord)message).getJsonNode());
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,18 @@
*/
package org.apache.pulsar.client.impl.schema;

import java.util.Collections;
import java.util.List;

import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_ALLOW_NULL;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_NOT_ALLOW_NULL;
import static org.testng.Assert.assertEquals;
import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.util.Collections;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
Expand All @@ -33,15 +40,10 @@
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.NestedBar;
import org.apache.pulsar.client.impl.schema.SchemaTestUtils.NestedBarList;
import org.apache.pulsar.common.schema.SchemaType;
import org.json.JSONException;
import org.skyscreamer.jsonassert.JSONAssert;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.json.JSONException;

import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.FOO_FIELDS;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_NOT_ALLOW_NULL;
import static org.apache.pulsar.client.impl.schema.SchemaTestUtils.SCHEMA_JSON_ALLOW_NULL;
import static org.testng.Assert.assertEquals;

@Slf4j
public class JSONSchemaTest {
Expand Down Expand Up @@ -332,4 +334,38 @@ public void testDecodeByteBuf() {
assertEquals(jsonSchema.decode(byteBuf), foo1);

}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class Seller {
public String state;
public String street;
public long zipCode;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class PC {
public String brand;
public String model;
public int year;
public GPU gpu;
public Seller seller;
}

private enum GPU {
AMD, NVIDIA
}

@Test
public void testEncodeAndDecodeObject() throws JsonProcessingException {
JSONSchema<PC> jsonSchema = JSONSchema.of(SchemaDefinition.<PC>builder().withPojo(PC.class).build());
PC pc = new PC("dell", "alienware", 2021, GPU.AMD,
new Seller("WA", "street", 98004));
byte[] encoded = jsonSchema.encode(pc);
PC roundtrippedPc = jsonSchema.decode(encoded);
assertEquals(roundtrippedPc, pc);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
*/
package org.apache.pulsar.client.impl.schema.generic;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.testng.annotations.Test;

import java.util.Collections;

import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.pulsar.client.api.schema.Field;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collections;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.annotations.Test;


public class GenericJsonRecordTest {
Expand Down Expand Up @@ -67,4 +73,43 @@ public void decodeLongField() throws Exception{
Object boolValue = record.getField("on");
assertTrue((boolean)boolValue);
}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class Seller {
public String state;
public String street;
public long zipCode;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
private static class PC {
public String brand;
public String model;
public int year;
public GPU gpu;
public Seller seller;
}

private enum GPU {
AMD, NVIDIA
}

@Test
public void testEncodeAndDecodeObject() throws JsonProcessingException {
// test case from issue https://github.com/apache/pulsar/issues/9605
JSONSchema<PC> jsonSchema = JSONSchema.of(SchemaDefinition.<PC>builder().withPojo(PC.class).build());
GenericSchema genericJsonSchema = GenericJsonSchema.of(jsonSchema.getSchemaInfo());
PC pc = new PC("dell", "alienware", 2021, GPU.AMD,
new Seller("WA", "street", 98004));
JsonNode jsonNode = ObjectMapperFactory.getThreadLocal().valueToTree(pc);
GenericJsonRecord genericJsonRecord =
new GenericJsonRecord(null, null, jsonNode, genericJsonSchema.getSchemaInfo());
byte[] encoded = genericJsonSchema.encode(genericJsonRecord);
PC roundtrippedPc = jsonSchema.decode(encoded);
assertEquals(roundtrippedPc, pc);
}
}

0 comments on commit 41e118a

Please sign in to comment.