Skip to content

Commit

Permalink
[FLINK-18299][json] Fix the non SQL standard timestamp format in JSON…
Browse files Browse the repository at this point in the history
… format


The current timestamp format in JSON format is not SQL standard which uses RFC-3339. This commit changes the default behavior to parse/generate timestamp using SQL standard. Besides, it introduces an option "json.timestamp-format.standard" to have the ability to fallback to ISO standard. 

This closes apache#12661
  • Loading branch information
fsk119 authored and wuchong committed Jun 17, 2020
1 parent 1bacaee commit c42a2f9
Show file tree
Hide file tree
Showing 26 changed files with 404 additions and 112 deletions.
14 changes: 13 additions & 1 deletion docs/dev/table/connectors/formats/canal.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,25 @@ Format Options
<td>Specify what format to use, here should be <code>'canal-json'</code>.</td>
</tr>
<tr>
<td><h5>json.ignore-parse-errors</h5></td>
<td><h5>canal-json.ignore-parse-errors</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
<tr>
<td><h5>canal-json.timestamp-format.standard</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
<ul>
<li>Option <code>'SQL'</code> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code>will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
</tbody>
</table>

Expand Down
14 changes: 13 additions & 1 deletion docs/dev/table/connectors/formats/canal.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,25 @@ Format Options
<td>Specify what format to use, here should be <code>'canal-json'</code>.</td>
</tr>
<tr>
<td><h5>json.ignore-parse-errors</h5></td>
<td><h5>canal-json.ignore-parse-errors</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
<tr>
<td><h5>canal-json.timestamp-format.standard</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
<ul>
<li>Option <code>'SQL'</code> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code>will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
</tbody>
</table>

Expand Down
14 changes: 13 additions & 1 deletion docs/dev/table/connectors/formats/debezium.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,25 @@ Format Options
This option indicates whether the Debezium JSON message includes the schema or not. </td>
</tr>
<tr>
<td><h5>json.ignore-parse-errors</h5></td>
<td><h5>debezium-json.ignore-parse-errors</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
<tr>
<td><h5>debezium-json.timestamp-format.standard</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
<ul>
<li>Option <code>'SQL'</code> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code>will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
</tbody>
</table>

Expand Down
14 changes: 13 additions & 1 deletion docs/dev/table/connectors/formats/debezium.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,25 @@ Format Options
This option indicates whether the Debezium JSON message includes the schema or not. </td>
</tr>
<tr>
<td><h5>json.ignore-parse-errors</h5></td>
<td><h5>debezium-json.ignore-parse-errors</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
<tr>
<td><h5>debezium-json.timestamp-format.standard</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
<ul>
<li>Option <code>'SQL'</code> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code>will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
</tbody>
</table>

Expand Down
12 changes: 12 additions & 0 deletions docs/dev/table/connectors/formats/json.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ Format Options
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
<tr>
<td><h5>json.timestamp-format.standard</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
<ul>
<li>Option <code>'SQL'</code> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code>will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
</tbody>
</table>

Expand Down
12 changes: 12 additions & 0 deletions docs/dev/table/connectors/formats/json.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ Format Options
<td>Skip fields and rows with parse errors instead of failing.
Fields are set to null in case of errors.</td>
</tr>
<tr>
<td><h5>json.timestamp-format.standard</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
<td>Specify the input and output timestamp format. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
<ul>
<li>Option <code>'SQL'</code> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code>will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ public void testWritingDocuments() throws Exception {
Map<String, Object> response = client.get(new GetRequest(index, myType, "1_2012-12-12T12:12:12")).actionGet().getSource();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
expectedMap.put("b", "00:00:12Z");
expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
expectedMap.put("g", "2012-12-12T12:12:12Z");
expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(response, equalTo(expectedMap));
}

Expand Down Expand Up @@ -165,12 +165,12 @@ public void testWritingDocumentsFromTableApi() throws Exception {
.getSource();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
expectedMap.put("b", "00:00:12Z");
expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
expectedMap.put("g", "2012-12-12T12:12:12Z");
expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(response, equalTo(expectedMap));
}

Expand Down Expand Up @@ -238,12 +238,12 @@ public void testWritingDocumentsNoPrimaryKey() throws Exception {
Map<String, Object> result = hits.getAt(0).getSourceAsMap();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
expectedMap.put("b", "00:00:12Z");
expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
expectedMap.put("g", "2012-12-12T12:12:12Z");
expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(result, equalTo(expectedMap));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ public void testWritingDocuments() throws Exception {
Map<String, Object> response = client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
expectedMap.put("b", "00:00:12Z");
expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
expectedMap.put("g", "2012-12-12T12:12:12Z");
expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(response, equalTo(expectedMap));
}

Expand Down Expand Up @@ -159,12 +159,12 @@ public void testWritingDocumentsFromTableApi() throws Exception {
Map<String, Object> response = client.get(new GetRequest(index, "1_2012-12-12T12:12:12")).actionGet().getSource();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
expectedMap.put("b", "00:00:12Z");
expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
expectedMap.put("g", "2012-12-12T12:12:12Z");
expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(response, equalTo(expectedMap));
}

Expand Down Expand Up @@ -230,12 +230,12 @@ public void testWritingDocumentsNoPrimaryKey() throws Exception {
Map<String, Object> result = hits.getAt(0).getSourceAsMap();
Map<Object, Object> expectedMap = new HashMap<>();
expectedMap.put("a", 1);
expectedMap.put("b", "00:00:12Z");
expectedMap.put("b", "00:00:12");
expectedMap.put("c", "ABCDE");
expectedMap.put("d", 12.12d);
expectedMap.put("e", 2);
expectedMap.put("f", "2003-10-20");
expectedMap.put("g", "2012-12-12T12:12:12Z");
expectedMap.put("g", "2012-12-12 12:12:12");
assertThat(result, equalTo(expectedMap));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,14 @@ public void testKafka() throws Exception {
String testAvroTopic = "test-avro-" + kafkaVersion + "-" + UUID.randomUUID().toString();
kafka.createTopic(1, 1, testJsonTopic);
String[] messages = new String[]{
"{\"rowtime\": \"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
"{\"rowtime\": \"2018-03-12T08:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
"{\"rowtime\": \"2018-03-12T09:00:00Z\", \"user\": \"Bob\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is another warning.\"}}",
"{\"rowtime\": \"2018-03-12T09:10:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"INFO\", \"message\": \"This is a info.\"}}",
"{\"rowtime\": \"2018-03-12T09:20:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is another info.\"}}",
"{\"rowtime\": \"2018-03-12T09:30:00Z\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is another info.\"}}",
"{\"rowtime\": \"2018-03-12T09:30:00Z\", \"user\": null, \"event\": { \"type\": \"WARNING\", \"message\": \"This is a bad message because the user is missing.\"}}",
"{\"rowtime\": \"2018-03-12T10:40:00Z\", \"user\": \"Bob\", \"event\": { \"type\": \"ERROR\", \"message\": \"This is an error.\"}}"
"{\"rowtime\": \"2018-03-12 08:00:00\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
"{\"rowtime\": \"2018-03-12 08:10:00\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
"{\"rowtime\": \"2018-03-12 09:00:00\", \"user\": \"Bob\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is another warning.\"}}",
"{\"rowtime\": \"2018-03-12 09:10:00\", \"user\": \"Alice\", \"event\": { \"type\": \"INFO\", \"message\": \"This is a info.\"}}",
"{\"rowtime\": \"2018-03-12 09:20:00\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is another info.\"}}",
"{\"rowtime\": \"2018-03-12 09:30:00\", \"user\": \"Steve\", \"event\": { \"type\": \"INFO\", \"message\": \"This is another info.\"}}",
"{\"rowtime\": \"2018-03-12 09:30:00\", \"user\": null, \"event\": { \"type\": \"WARNING\", \"message\": \"This is a bad message because the user is missing.\"}}",
"{\"rowtime\": \"2018-03-12 10:40:00\", \"user\": \"Bob\", \"event\": { \"type\": \"ERROR\", \"message\": \"This is an error.\"}}"
};
kafka.sendMessages(testJsonTopic, messages);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.apache.flink.formats.json.JsonFormatFactory.validateFormatOptions;
import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;

/**
* Factory to build reader/writer to read/write json format file.
Expand All @@ -70,6 +71,7 @@ public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FAIL_ON_MISSING_FIELD);
options.add(IGNORE_PARSE_ERRORS);
options.add(TIMESTAMP_FORMAT);
return options;
}

Expand All @@ -79,13 +81,15 @@ public Set<ConfigOption<?>> optionalOptions() {
validateFormatOptions(options);
boolean failOnMissingField = options.get(FAIL_ON_MISSING_FIELD);
boolean ignoreParseErrors = options.get(IGNORE_PARSE_ERRORS);
TimestampFormat timestampOption = JsonOptions.getTimestampFormat(options);

RowType formatRowType = context.getFormatRowType();
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
formatRowType,
new GenericTypeInfo(GenericRowData.class),
failOnMissingField,
ignoreParseErrors);
ignoreParseErrors,
timestampOption);

String[] fieldNames = context.getSchema().getFieldNames();
List<String> projectFields = Arrays.stream(context.getProjectFields())
Expand Down Expand Up @@ -117,7 +121,8 @@ public Set<ConfigOption<?>> optionalOptions() {

@Override
public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
return Optional.of(new JsonRowDataEncoder(new JsonRowDataSerializationSchema(context.getFormatRowType())));
return Optional.of(new JsonRowDataEncoder(new JsonRowDataSerializationSchema(context.getFormatRowType(),
JsonOptions.getTimestampFormat(context.getFormatOptions()))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT_ENUM;

/**
* Table format factory for providing configured instances of JSON to RowData
Expand All @@ -64,6 +66,7 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(

final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);

return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
Expand All @@ -77,7 +80,9 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
rowType,
rowDataTypeInfo,
failOnMissingField,
ignoreParseErrors);
ignoreParseErrors,
timestampOption
);
}

@Override
Expand All @@ -93,13 +98,15 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);

TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context,
DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new JsonRowDataSerializationSchema(rowType);
return new JsonRowDataSerializationSchema(rowType, timestampOption);
}

@Override
Expand All @@ -124,6 +131,7 @@ public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> options = new HashSet<>();
options.add(FAIL_ON_MISSING_FIELD);
options.add(IGNORE_PARSE_ERRORS);
options.add(TIMESTAMP_FORMAT);
return options;
}

Expand All @@ -134,11 +142,16 @@ public Set<ConfigOption<?>> optionalOptions() {
static void validateFormatOptions(ReadableConfig tableOptions) {
boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD);
boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS);
String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
if (ignoreParseErrors && failOnMissingField) {
throw new ValidationException(FAIL_ON_MISSING_FIELD.key()
+ " and "
+ IGNORE_PARSE_ERRORS.key()
+ " shouldn't both be true.");
}
if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)){
throw new ValidationException(String.format("Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].",
timestampFormat, TIMESTAMP_FORMAT.key()));
}
}
}
Loading

0 comments on commit c42a2f9

Please sign in to comment.