diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java index d66ecce22c869..956ca5baab291 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDataDeserializationSchema.java @@ -304,7 +304,11 @@ private TimestampData convertToTimestamp(JsonNode jsonNode) { } private StringData convertToString(JsonNode jsonNode) { - return StringData.fromString(jsonNode.asText()); + if (jsonNode.isContainerNode()) { + return StringData.fromString(jsonNode.toString()); + } else { + return StringData.fromString(jsonNode.asText()); + } } private byte[] convertToBytes(JsonNode jsonNode) { diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java index cc9b55d589b0a..f9a6395dfa73e 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java @@ -347,7 +347,7 @@ private Optional createConverterForSimpleType(T } else if (simpleTypeInfo == Types.BOOLEAN) { return Optional.of(this::convertToBoolean); } else if (simpleTypeInfo == Types.STRING) { - return Optional.of((mapper, jsonNode) -> jsonNode.asText()); + return Optional.of(this::convertToString); } else if (simpleTypeInfo == Types.INT) { return Optional.of(this::convertToInt); } else if (simpleTypeInfo == Types.LONG) { @@ -381,6 +381,14 @@ private Optional createConverterForSimpleType(T } } + private String convertToString(ObjectMapper mapper, JsonNode jsonNode) { + if (jsonNode.isContainerNode()) { + return jsonNode.toString(); + } else { + return jsonNode.asText(); + } + } + private boolean convertToBoolean(ObjectMapper mapper, JsonNode jsonNode) { if (jsonNode.isBoolean()) { // avoid redundant toString and parseBoolean, for better performance diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java index cab427f11170b..7b561aaf0c426 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDataSerDeSchemaTest.java @@ -30,11 +30,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import org.junit.Rule; +import org.junit.Assert; import org.junit.Test; -import org.junit.rules.ExpectedException; -import java.io.IOException; import java.math.BigDecimal; import java.sql.Timestamp; import java.time.LocalDate; @@ -71,9 +69,6 @@ */ public class JsonRowDataSerDeSchemaTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Test public void testSerDe() throws Exception { byte tinyint = 'c'; @@ -326,9 +321,13 @@ public void testDeserializationMissingNode() throws Exception { deserializationSchema = deserializationSchema = new JsonRowDataDeserializationSchema( schema, WrapperTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601); - thrown.expect(IOException.class); - thrown.expectMessage("Failed to deserialize JSON '{\"id\":123123123}'"); - deserializationSchema.deserialize(serializedJson); + String errorMessage = "Failed to deserialize JSON '{\"id\":123123123}'."; + try { + deserializationSchema.deserialize(serializedJson); + Assert.fail("expecting exception message: " + errorMessage); + } catch (Throwable t) { + assertEquals(errorMessage, t.getMessage()); + } // ignore on parse error deserializationSchema = new JsonRowDataDeserializationSchema( @@ -336,12 +335,15 @@ public void testDeserializationMissingNode() throws Exception { actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType); assertEquals(expected, actual); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled"); - // failOnMissingField and ignoreParseErrors both enabled - //noinspection ConstantConditions - new JsonRowDataDeserializationSchema( - schema, WrapperTypeInfo.of(schema), true, true, TimestampFormat.ISO_8601); + errorMessage = "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled."; + try { + // failOnMissingField and ignoreParseErrors both enabled + new JsonRowDataDeserializationSchema( + schema, WrapperTypeInfo.of(schema), true, true, TimestampFormat.ISO_8601); + Assert.fail("expecting exception message: " + errorMessage); + } catch (Throwable t) { + assertEquals(errorMessage, t.getMessage()); + } } @Test @@ -380,7 +382,7 @@ private void testIgnoreParseErrors(TestSpec spec) throws Exception { // the parsing field should be null and no exception is thrown JsonRowDataDeserializationSchema ignoreErrorsSchema = new JsonRowDataDeserializationSchema( spec.rowType, WrapperTypeInfo.of(spec.rowType), false, true, - TimestampFormat.ISO_8601); + spec.timestampFormat); Row expected; if (spec.expected != null) { expected = spec.expected; @@ -400,8 +402,12 @@ private void testParseErrors(TestSpec spec) throws Exception { spec.rowType, WrapperTypeInfo.of(spec.rowType), false, false, spec.timestampFormat); - thrown.expectMessage(spec.errorMessage); - failingSchema.deserialize(spec.json.getBytes()); + try { + failingSchema.deserialize(spec.json.getBytes()); + Assert.fail("expecting exception " + spec.errorMessage); + } catch (Throwable t) { + assertEquals(t.getMessage(), spec.errorMessage); + } } private static List testData = Arrays.asList( @@ -418,7 +424,7 @@ private void testParseErrors(TestSpec spec) throws Exception { TestSpec .json("{\"id\":\"abc\"}") .rowType(ROW(FIELD("id", INT()))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'."), TestSpec .json("{\"id\":112.013}") @@ -428,84 +434,111 @@ private void testParseErrors(TestSpec spec) throws Exception { TestSpec .json("{\"id\":\"long\"}") .rowType(ROW(FIELD("id", BIGINT()))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"long\"}'."), TestSpec .json("{\"id\":\"112.013.123\"}") .rowType(ROW(FIELD("id", FLOAT()))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'."), TestSpec .json("{\"id\":\"112.013.123\"}") .rowType(ROW(FIELD("id", DOUBLE()))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"112.013.123\"}'."), TestSpec .json("{\"id\":\"18:00:243\"}") .rowType(ROW(FIELD("id", TIME()))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'."), TestSpec .json("{\"id\":\"18:00:243\"}") .rowType(ROW(FIELD("id", TIME()))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"18:00:243\"}'."), TestSpec .json("{\"id\":\"20191112\"}") .rowType(ROW(FIELD("id", DATE()))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'."), TestSpec .json("{\"id\":\"20191112\"}") .rowType(ROW(FIELD("id", DATE()))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"20191112\"}'."), + + TestSpec + .json("{\"id\":true}") + .rowType(ROW(FIELD("id", STRING()))) + .expect(Row.of("true")), + + TestSpec + .json("{\"id\":123.234}") + .rowType(ROW(FIELD("id", STRING()))) + .expect(Row.of("123.234")), + + TestSpec + .json("{\"id\":1234567}") + .rowType(ROW(FIELD("id", STRING()))) + .expect(Row.of("1234567")), + + TestSpec + .json("{\"id\":\"string field\"}") + .rowType(ROW(FIELD("id", STRING()))) + .expect(Row.of("string field")), + + TestSpec + .json("{\"id\":[\"array data1\",\"array data2\",123,234.345]}") + .rowType(ROW(FIELD("id", STRING()))) + .expect(Row.of("[\"array data1\",\"array data2\",123,234.345]")), + + TestSpec + .json("{\"id\":{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}}") + .rowType(ROW(FIELD("id", STRING()))) + .expect(Row.of("{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}")), TestSpec .json("{\"id\":\"2019-11-12 18:00:12\"}") .rowType(ROW(FIELD("id", TIMESTAMP(0)))) .timestampFormat(TimestampFormat.ISO_8601) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12 18:00:12\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12 18:00:12\"}'."), TestSpec .json("{\"id\":\"2019-11-12T18:00:12\"}") .rowType(ROW(FIELD("id", TIMESTAMP(0)))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12\"}'."), TestSpec .json("{\"id\":\"2019-11-12T18:00:12Z\"}") .rowType(ROW(FIELD("id", TIMESTAMP(0)))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'."), TestSpec .json("{\"id\":\"2019-11-12T18:00:12Z\"}") .rowType(ROW(FIELD("id", TIMESTAMP(0)))) .timestampFormat(TimestampFormat.ISO_8601) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"2019-11-12T18:00:12Z\"}'."), TestSpec .json("{\"id\":\"abc\"}") .rowType(ROW(FIELD("id", DECIMAL(10, 3)))) - .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"id\":\"abc\"}'."), TestSpec .json("{\"row\":{\"id\":\"abc\"}}") .rowType(ROW(FIELD("row", ROW(FIELD("id", BOOLEAN()))))) - .expect(Row.of(new Row(1))) - .expectErrorMessage("Failed to deserialize JSON '{\"row\":{\"id\":\"abc\"}}'"), + .expect(Row.of(Row.of(false))), TestSpec .json("{\"array\":[123, \"abc\"]}") .rowType(ROW(FIELD("array", ARRAY(INT())))) .expect(Row.of((Object) new Integer[]{123, null})) - .expectErrorMessage("Failed to deserialize JSON '{\"array\":[123, \"abc\"]}'"), + .expectErrorMessage("Failed to deserialize JSON '{\"array\":[123, \"abc\"]}'."), TestSpec .json("{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}") .rowType(ROW(FIELD("map", MAP(STRING(), INT())))) .expect(Row.of(createHashMap("key1", 123, "key2", null))) - .expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'") - - + .expectErrorMessage("Failed to deserialize JSON '{\"map\":{\"key1\":\"123\", \"key2\":\"abc\"}}'.") ); private static Map createHashMap(String k1, Integer v1, String k2, Integer v2) { diff --git a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java index cba7fcefd2cb9..438a18c20d1ae 100644 --- a/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java +++ b/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java @@ -313,6 +313,36 @@ private void testParseErrors(TestSpec spec) { .typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.LONG)) .expect(Row.of(112L)), + TestSpec + .json("{\"id\":true}") + .typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING)) + .expect(Row.of("true")), + + TestSpec + .json("{\"id\":123.234}") + .typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING)) + .expect(Row.of("123.234")), + + TestSpec + .json("{\"id\":1234567}") + .typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING)) + .expect(Row.of("1234567")), + + TestSpec + .json("{\"id\":\"string field\"}") + .typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING)) + .expect(Row.of("string field")), + + TestSpec + .json("{\"id\":[\"array data1\",\"array data2\",123,234.345]}") + .typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING)) + .expect(Row.of("[\"array data1\",\"array data2\",123,234.345]")), + + TestSpec + .json("{\"id\":{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}}") + .typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.STRING)) + .expect(Row.of("{\"k1\":123,\"k2\":234.234,\"k3\":\"string data\"}")), + TestSpec .json("{\"id\":\"long\"}") .typeInfo(Types.ROW_NAMED(new String[]{"id"}, Types.LONG))