Skip to content

Commit

Permalink
[Pulsar IO] ElasticSearch Sink: support Fixed and ENUM datatypes (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Jan 20, 2022
1 parent 76fc2fb commit dc20ef0
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;

/**
Expand Down Expand Up @@ -64,6 +65,8 @@ public static JsonNode toJson(Schema schema, Object value) {
return jsonNodeFactory.nullNode();
}
switch(schema.getType()) {
case NULL: // this should not happen
return jsonNodeFactory.nullNode();
case INT:
return jsonNodeFactory.numberNode((Integer) value);
case LONG:
Expand All @@ -76,6 +79,9 @@ public static JsonNode toJson(Schema schema, Object value) {
return jsonNodeFactory.booleanNode((Boolean) value);
case BYTES:
return jsonNodeFactory.binaryNode((byte[]) value);
case FIXED:
return jsonNodeFactory.binaryNode(((GenericFixed) value).bytes());
case ENUM: // GenericEnumSymbol
case STRING:
return jsonNodeFactory.textNode(value.toString()); // can be a String or org.apache.avro.util.Utf8
case ARRAY: {
Expand Down Expand Up @@ -105,6 +111,8 @@ public static JsonNode toJson(Schema schema, Object value) {
}
return toJson(s, value);
}
// this case should not happen
return jsonNodeFactory.textNode(value.toString());
default:
throw new UnsupportedOperationException("Unknown AVRO schema type=" + schema.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public void testAvroToJson() throws IOException {
.name("d").type().doubleType().doubleDefault(10.0)
.name("f").type().floatType().floatDefault(10.0f)
.name("s").type().stringType().stringDefault("titi")
.name("fi").type().fixed("fi").size(3).fixedDefault(new byte[]{1,2,3})
.name("en").type().enumeration("en").symbols("a","b","c").enumDefault("b")
.name("array").type().optional().array().items(SchemaBuilder.builder().stringType())
.name("map").type().optional().map().values(SchemaBuilder.builder().intType())
.endRecord();
Expand All @@ -69,6 +71,8 @@ public void testAvroToJson() throws IOException {
genericRecord.put("d", 10.0);
genericRecord.put("f", 10.0f);
genericRecord.put("s", "toto");
genericRecord.put("fi", GenericData.get().createFixed(null, new byte[]{'a','b','c'}, schema.getField("fi").schema()));
genericRecord.put("en", GenericData.get().createEnum("b", schema.getField("en").schema()));
genericRecord.put("array", new String[] {"toto"});
genericRecord.put("map", ImmutableMap.of("a",10));
JsonNode jsonNode = JsonConverter.toJson(genericRecord);
Expand All @@ -77,6 +81,8 @@ public void testAvroToJson() throws IOException {
assertEquals(jsonNode.get("i").asInt(), 1);
assertEquals(jsonNode.get("b").asBoolean(), true);
assertEquals(jsonNode.get("bb").binaryValue(), "10".getBytes(StandardCharsets.UTF_8));
assertEquals(jsonNode.get("fi").binaryValue(), "abc".getBytes(StandardCharsets.UTF_8));
assertEquals(jsonNode.get("en").textValue(), "b");
assertEquals(jsonNode.get("d").asDouble(), 10.0);
assertEquals(jsonNode.get("f").numberValue(), 10.0f);
assertEquals(jsonNode.get("s").asText(), "toto");
Expand Down

0 comments on commit dc20ef0

Please sign in to comment.