Skip to content

Commit

Permalink
[fix][elasticsearch-sink] Handle Avro collections native types (Gener…
Browse files Browse the repository at this point in the history
…icData.Array and Utf8 map keys) (apache#15430)
  • Loading branch information
nicoloboschi authored May 25, 2022
1 parent edd712c commit b7e1510
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;

/**
* Convert an AVRO GenericRecord to a JsonNode.
*/
Expand Down Expand Up @@ -87,18 +87,26 @@ public static JsonNode toJson(Schema schema, Object value) {
case ARRAY: {
Schema elementSchema = schema.getElementType();
ArrayNode arrayNode = jsonNodeFactory.arrayNode();
for (Object elem : (Object[]) value) {
Object[] iterable;
if (value instanceof GenericData.Array) {
iterable = ((GenericData.Array) value).toArray();
} else {
iterable = (Object[]) value;
}
for (Object elem : iterable) {
JsonNode fieldValue = toJson(elementSchema, elem);
arrayNode.add(fieldValue);
}
return arrayNode;
}
case MAP: {
Map<String, Object> map = (Map<String, Object>) value;
Map<Object, Object> map = (Map<Object, Object>) value;
ObjectNode objectNode = jsonNodeFactory.objectNode();
for (Map.Entry<String, Object> entry : map.entrySet()) {
for (Map.Entry<Object, Object> entry : map.entrySet()) {
JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue());
objectNode.set(entry.getKey(), jsonNode);
// can be a String or org.apache.avro.util.Utf8
final String entryKey = entry.getKey() == null ? null : entry.getKey().toString();
objectNode.set(entryKey, jsonNode);
}
return objectNode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.TimeZone;
Expand All @@ -48,6 +49,8 @@ public class JsonConverterTests {

@Test
public void testAvroToJson() throws IOException {
Schema avroArraySchema = SchemaBuilder.array().items(SchemaBuilder.builder().stringType());
Schema mapUtf8Schema = SchemaBuilder.map().values(SchemaBuilder.builder().intType());
Schema schema = SchemaBuilder.record("record").fields()
.name("n").type().longType().longDefault(10)
.name("l").type().longType().longDefault(10)
Expand All @@ -60,7 +63,9 @@ public void testAvroToJson() throws IOException {
.name("fi").type().fixed("fi").size(3).fixedDefault(new byte[]{1,2,3})
.name("en").type().enumeration("en").symbols("a","b","c").enumDefault("b")
.name("array").type().optional().array().items(SchemaBuilder.builder().stringType())
.name("arrayavro").type().optional().array().items(SchemaBuilder.builder().stringType())
.name("map").type().optional().map().values(SchemaBuilder.builder().intType())
.name("maputf8").type().optional().map().values(SchemaBuilder.builder().intType())
.endRecord();
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("n", null);
Expand All @@ -74,7 +79,9 @@ public void testAvroToJson() throws IOException {
genericRecord.put("fi", GenericData.get().createFixed(null, new byte[]{'a','b','c'}, schema.getField("fi").schema()));
genericRecord.put("en", GenericData.get().createEnum("b", schema.getField("en").schema()));
genericRecord.put("array", new String[] {"toto"});
genericRecord.put("arrayavro", new GenericData.Array<>(avroArraySchema, Arrays.asList("toto")));
genericRecord.put("map", ImmutableMap.of("a",10));
genericRecord.put("maputf8", ImmutableMap.of(new org.apache.avro.util.Utf8("a"),10));
JsonNode jsonNode = JsonConverter.toJson(genericRecord);
assertEquals(jsonNode.get("n"), NullNode.getInstance());
assertEquals(jsonNode.get("l").asLong(), 1L);
Expand All @@ -88,9 +95,14 @@ public void testAvroToJson() throws IOException {
assertEquals(jsonNode.get("s").asText(), "toto");
assertTrue(jsonNode.get("array").isArray());
assertEquals(jsonNode.get("array").iterator().next().asText(), "toto");
assertTrue(jsonNode.get("arrayavro").isArray());
assertEquals(jsonNode.get("arrayavro").iterator().next().asText(), "toto");
assertTrue(jsonNode.get("map").isObject());
assertEquals(jsonNode.get("map").elements().next().asText(), "10");
assertEquals(jsonNode.get("map").get("a").numberValue(), 10);
assertTrue(jsonNode.get("maputf8").isObject());
assertEquals(jsonNode.get("maputf8").elements().next().asText(), "10");
assertEquals(jsonNode.get("maputf8").get("a").numberValue(), 10);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

import static org.testng.Assert.assertTrue;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.google.common.collect.ImmutableMap;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
Expand Down Expand Up @@ -60,6 +65,9 @@ public abstract class ElasticSearchSinkTester extends SinkTester<ElasticsearchCo
public static final class SimplePojo {
private String field1;
private String field2;
private List<Integer> list1;
private Set<Long> set1;
private Map<String, String> map1;
}

/**
Expand Down Expand Up @@ -128,9 +136,20 @@ public void produceMessage(int numMessages, PulsarClient client,
for (int i = 0; i < numMessages; i++) {
String key = "key-" + i;
kvs.put(key, key);
final SimplePojo keyPojo = new SimplePojo(
"f1_" + i,
"f2_" + i,
Arrays.asList(i, i +1),
new HashSet<>(Arrays.asList((long) i)),
ImmutableMap.of("map1_k_" + i, "map1_kv_" + i));
final SimplePojo valuePojo = new SimplePojo(
"f1_" + i,
"f2_" + i,
Arrays.asList(i, i +1),
new HashSet<>(Arrays.asList((long) i)),
ImmutableMap.of("map1_v_" + i, "map1_vv_" + i));
producer.newMessage()
.value(new KeyValue<>(new SimplePojo("f1_" + i, "f2_" + i),
new SimplePojo("v1_" + i, "v2_" + i)))
.value(new KeyValue<>(keyPojo, valuePojo))
.send();
}

Expand Down

0 comments on commit b7e1510

Please sign in to comment.