Skip to content

Commit

Permalink
[FLINK-18002][json] Correct the behavior for ContainerNode as varchar…
Browse files Browse the repository at this point in the history
… type

This closes apache#12421
  • Loading branch information
libenchao committed Jul 13, 2020
1 parent f81f3a0 commit 3865f7b
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,11 @@ private TimestampData convertToTimestamp(JsonNode jsonNode) {
}

private StringData convertToString(JsonNode jsonNode) {
return StringData.fromString(jsonNode.asText());
if (jsonNode.isContainerNode()) {
return StringData.fromString(jsonNode.toString());
} else {
return StringData.fromString(jsonNode.asText());
}
}

private byte[] convertToBytes(JsonNode jsonNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private Optional<DeserializationRuntimeConverter> createConverterForSimpleType(T
} else if (simpleTypeInfo == Types.BOOLEAN) {
return Optional.of(this::convertToBoolean);
} else if (simpleTypeInfo == Types.STRING) {
return Optional.of((mapper, jsonNode) -> jsonNode.asText());
return Optional.of(this::convertToString);
} else if (simpleTypeInfo == Types.INT) {
return Optional.of(this::convertToInt);
} else if (simpleTypeInfo == Types.LONG) {
Expand Down Expand Up @@ -381,6 +381,14 @@ private Optional<DeserializationRuntimeConverter> createConverterForSimpleType(T
}
}

private String convertToString(ObjectMapper mapper, JsonNode jsonNode) {
if (jsonNode.isContainerNode()) {
return jsonNode.toString();
} else {
return jsonNode.asText();
}
}

private boolean convertToBoolean(ObjectMapper mapper, JsonNode jsonNode) {
if (jsonNode.isBoolean()) {
// avoid redundant toString and parseBoolean, for better performance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import org.junit.Rule;
import org.junit.Assert;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDate;
Expand Down Expand Up @@ -71,9 +69,6 @@
*/
public class JsonRowDataSerDeSchemaTest {

@Rule
public ExpectedException thrown = ExpectedException.none();

@Test
public void testSerDe() throws Exception {
byte tinyint = 'c';
Expand Down Expand Up @@ -326,22 +321,29 @@ public void testDeserializationMissingNode() throws Exception {
deserializationSchema = deserializationSchema = new JsonRowDataDeserializationSchema(
schema, WrapperTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601);

thrown.expect(IOException.class);
thrown.expectMessage("Failed to deserialize JSON '{\"id\":123123123}'");
deserializationSchema.deserialize(serializedJson);
String errorMessage = "Failed to deserialize JSON '{\"id\":123123123}'.";
try {
deserializationSchema.deserialize(serializedJson);
Assert.fail("expecting exception message: " + errorMessage);
} catch (Throwable t) {
assertEquals(errorMessage, t.getMessage());
}

// ignore on parse error
deserializationSchema = new JsonRowDataDeserializationSchema(
schema, WrapperTypeInfo.of(schema), false, true, TimestampFormat.ISO_8601);
actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
assertEquals(expected, actual);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled");
// failOnMissingField and ignoreParseErrors both enabled
//noinspection ConstantConditions
new JsonRowDataDeserializationSchema(
schema, WrapperTypeInfo.of(schema), true, true, TimestampFormat.ISO_8601);
errorMessage = "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.";
try {
// failOnMissingField and ignoreParseErrors both enabled
new JsonRowDataDeserializationSchema(
schema, WrapperTypeInfo.of(schema), true, true, TimestampFormat.ISO_8601);
Assert.fail("expecting exception message: " + errorMessage);
} catch (Throwable t) {
assertEquals(errorMessage, t.getMessage());
}
}

@Test
Expand Down Expand Up @@ -380,7 +382,7 @@ private void testIgnoreParseErrors(TestSpec spec) throws Exception {
// the parsing field should be null and no exception is thrown
JsonRowDataDeserializationSchema ignoreErrorsSchema = new JsonRowDataDeserializationSchema(
spec.rowType, WrapperTypeInfo.of(spec.rowType), false, true,
TimestampFormat.ISO_8601);
spec.timestampFormat);
Row expected;
if (spec.expected != null) {
expected = spec.expected;
Expand All @@ -400,8 +402,12 @@ private void testParseErrors(TestSpec spec) throws Exception {
spec.rowType, WrapperTypeInfo.of(spec.rowType), false, false,
spec.timestampFormat);

thrown.expectMessage(spec.errorMessage);
failingSchema.deserialize(spec.json.getBytes());
try {
failingSchema.deserialize(spec.json.getBytes());
Assert.fail("expecting exception " + spec.errorMessage);
} catch (Throwable t) {
assertEquals(t.getMessage(), spec.errorMessage);
}
}

private static List<TestSpec> testData = Arrays.asList(
Expand All @@ -418,7 +424,7 @@ private void testParseErrors(TestSpec spec) throws Exception {
TestSpec
.json("{\"id\":\"abc\"}")
.rowType(ROW(FIELD("id", INT())))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'."),

TestSpec
.json("{\"id\":112.013}")
Expand All @@ -428,84 +434,111 @@ private void testParseErrors(TestSpec spec) throws Exception {
TestSpec
.json("{\"id\":\"long\"}")
.rowType(ROW(FIELD("id", BIGINT())))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'."),

TestSpec
.json("{\"id\":\"112.013.123\"}")
.rowType(ROW(FIELD("id", FLOAT())))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'."),

TestSpec
.json("{\"id\":\"112.013.123\"}")
.rowType(ROW(FIELD("id", DOUBLE())))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'."),

TestSpec
.json("{\"id\":\"18:00:243\"}")
.rowType(ROW(FIELD("id", TIME())))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'."),

TestSpec
.json("{\"id\":\"18:00:243\"}")
.rowType(ROW(FIELD("id", TIME())))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'."),

TestSpec
.json("{\"id\":\"20191112\"}")
.rowType(ROW(FIELD("id", DATE())))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'."),

TestSpec
.json("{\"id\":\"20191112\"}")
.rowType(ROW(FIELD("id", DATE())))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'."),

TestSpec
.json("{\"id\":true}")
.rowType(ROW(FIELD("id", STRING())))
.expect(Row.of("true")),

TestSpec
.json("{\"id\":123.234}")
.rowType(ROW(FIELD("id", STRING())))
.expect(Row.of("123.234")),

TestSpec
.json("{\"id\":1234567}")
.rowType(ROW(FIELD("id", STRING())))
.expect(Row.of("1234567")),

TestSpec
.json("{\"id\":\"string field\"}")
.rowType(ROW(FIELD("id", STRING())))
.expect(Row.of("string field")),

TestSpec
.json("{\"id\":[\"array data1\",\"array data2\",123,234.345]}")
.rowType(ROW(FIELD("id", STRING())))
.expect(Row.of("[\"array data1\",\"array data2\",123,234.345]")),

TestSpec
.json("{\"id\":{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}}")
.rowType(ROW(FIELD("id", STRING())))
.expect(Row.of("{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}")),

TestSpec
.json("{\"id\":\"2019-11-12 18:00:12\"}")
.rowType(ROW(FIELD("id", TIMESTAMP(0))))
.timestampFormat(TimestampFormat.ISO_8601)
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12 18:00:12\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12 18:00:12\"}'."),

TestSpec
.json("{\"id\":\"2019-11-12T18:00:12\"}")
.rowType(ROW(FIELD("id", TIMESTAMP(0))))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12\"}'."),

TestSpec
.json("{\"id\":\"2019-11-12T18:00:12Z\"}")
.rowType(ROW(FIELD("id", TIMESTAMP(0))))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'."),

TestSpec
.json("{\"id\":\"2019-11-12T18:00:12Z\"}")
.rowType(ROW(FIELD("id", TIMESTAMP(0))))
.timestampFormat(TimestampFormat.ISO_8601)
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'."),

TestSpec
.json("{\"id\":\"abc\"}")
.rowType(ROW(FIELD("id", DECIMAL(10, 3))))
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'."),

TestSpec
.json("{\"row\":{\"id\":\"abc\"}}")
.rowType(ROW(FIELD("row", ROW(FIELD("id", BOOLEAN())))))
.expect(Row.of(new Row(1)))
.expectErrorMessage("Failed to deserialize JSON '{\"row\":{\"id\":\"abc\"}}'"),
.expect(Row.of(Row.of(false))),

TestSpec
.json("{\"array\":[123, \"abc\"]}")
.rowType(ROW(FIELD("array", ARRAY(INT()))))
.expect(Row.of((Object) new Integer[]{123, null}))
.expectErrorMessage("Failed to deserialize JSON '{\"array\":[123, \"abc\"]}'"),
.expectErrorMessage("Failed to deserialize JSON '{\"array\":[123, \"abc\"]}'."),

TestSpec
.json("{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}")
.rowType(ROW(FIELD("map", MAP(STRING(), INT()))))
.expect(Row.of(createHashMap("key1", 123, "key2", null)))
.expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'")


.expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'.")
);

private static Map<String, Integer> createHashMap(String k1, Integer v1, String k2, Integer v2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,36 @@ private void testParseErrors(TestSpec spec) {
.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.LONG))
.expect(Row.of(112L)),

TestSpec
.json("{\"id\":true}")
.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
.expect(Row.of("true")),

TestSpec
.json("{\"id\":123.234}")
.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
.expect(Row.of("123.234")),

TestSpec
.json("{\"id\":1234567}")
.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
.expect(Row.of("1234567")),

TestSpec
.json("{\"id\":\"string field\"}")
.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
.expect(Row.of("string field")),

TestSpec
.json("{\"id\":[\"array data1\",\"array data2\",123,234.345]}")
.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
.expect(Row.of("[\"array data1\",\"array data2\",123,234.345]")),

TestSpec
.json("{\"id\":{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}}")
.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING))
.expect(Row.of("{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}")),

TestSpec
.json("{\"id\":\"long\"}")
.typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.LONG))
Expand Down

0 comments on commit 3865f7b

Please sign in to comment.