Skip to content

Commit

Permalink
[Issue 13651][elastic-search] Fix Elasticsearch Sink Invalid type for…
Browse files Browse the repository at this point in the history
… uuid encoded as logical types (apache#13652)
  • Loading branch information
Vincent Royer authored Jan 10, 2022
1 parent 916b61d commit f0b3e3b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,7 @@ JsonNode toJson(Schema schema, Object value) {
new Conversions.UUIDConversion()) {
@Override
JsonNode toJson(Schema schema, Object value) {
if (!(value instanceof String)) {
throw new IllegalArgumentException("Invalid type for uuid, expected String but was "
+ value.getClass());
}
String uuidString = (String) value;
return jsonNodeFactory.textNode(uuidString);
return jsonNodeFactory.textNode(value == null ? null : value.toString());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,20 @@
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.testng.annotations.Test;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.TimeZone;
import java.util.UUID;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -80,8 +88,7 @@ public void testAvroToJson() throws IOException {
}

@Test
public void testLogicalTypesToJson() {
Schema decimalType = LogicalTypes.decimal(3,3).addToSchema(Schema.create(Schema.Type.BYTES));
public void testLogicalTypesToJson() throws IOException {
Schema dateType = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
Schema timestampMillisType = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
Schema timestampMicrosType = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
Expand All @@ -90,7 +97,6 @@ public void testLogicalTypesToJson() {
Schema uuidType = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
Schema schema = SchemaBuilder.record("record")
.fields()
.name("amount").type(decimalType).noDefault()
.name("mydate").type(dateType).noDefault()
.name("tsmillis").type(timestampMillisType).noDefault()
.name("tsmicros").type(timestampMicrosType).noDefault()
Expand All @@ -100,24 +106,40 @@ public void testLogicalTypesToJson() {
.endRecord();

final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
BigDecimal myDecimal = new BigDecimal("10.34");
BigDecimal myDecimal = new BigDecimal("100.003");
UUID myUuid = UUID.randomUUID();
Calendar calendar = new GregorianCalendar(TimeZone.getTimeZone("Europe/Copenhagen"));
GenericRecord genericRecord = new GenericData.Record(schema);
genericRecord.put("amount", myDecimal);
genericRecord.put("mydate", (int)calendar.toInstant().getEpochSecond());
genericRecord.put("tsmillis", calendar.getTimeInMillis());
genericRecord.put("tsmicros", calendar.getTimeInMillis() * 1000);
genericRecord.put("timemillis", (int)(calendar.getTimeInMillis() % MILLIS_PER_DAY));
genericRecord.put("timemicros", (calendar.getTimeInMillis() %MILLIS_PER_DAY) * 1000);
genericRecord.put("myuuid", myUuid.toString());
JsonNode jsonNode = JsonConverter.toJson(genericRecord);
assertEquals(new BigDecimal(jsonNode.get("amount").asText()), myDecimal);

GenericRecord genericRecord2 = deserialize(serialize(genericRecord, schema), schema);
JsonNode jsonNode = JsonConverter.toJson(genericRecord2);
assertEquals(jsonNode.get("mydate").asInt(), calendar.toInstant().getEpochSecond());
assertEquals(jsonNode.get("tsmillis").asInt(), (int)calendar.getTimeInMillis());
assertEquals(jsonNode.get("tsmicros").asLong(), calendar.getTimeInMillis() * 1000);
assertEquals(jsonNode.get("timemillis").asInt(), (int)(calendar.getTimeInMillis() % MILLIS_PER_DAY));
assertEquals(jsonNode.get("timemicros").asLong(), (calendar.getTimeInMillis() %MILLIS_PER_DAY) * 1000);
assertEquals(UUID.fromString(jsonNode.get("myuuid").asText()), myUuid);
}

public static byte[] serialize(GenericRecord record, Schema schema) throws IOException {
SpecificDatumWriter<GenericRecord> datumWriter = new SpecificDatumWriter<>(schema);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder = new EncoderFactory().binaryEncoder(byteArrayOutputStream, null);
datumWriter.write(record, binaryEncoder);
binaryEncoder.flush();
return byteArrayOutputStream.toByteArray();
}

public static GenericRecord deserialize(byte[] recordBytes, Schema schema) throws IOException {
DatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(schema);
ByteArrayInputStream stream = new ByteArrayInputStream(recordBytes);
BinaryDecoder binaryDecoder = new DecoderFactory().binaryDecoder(stream, null);
return datumReader.read(null, binaryDecoder);
}
}

0 comments on commit f0b3e3b

Please sign in to comment.