Skip to content

Commit

Permalink
[FLINK-19912][json] Fix JSON format fails to serialize map value with…
Browse files Browse the repository at this point in the history
… null keys

This closes apache#13972
  • Loading branch information
wangxlong authored Nov 11, 2020
1 parent 0ab2389 commit ba7c232
Show file tree
Hide file tree
Showing 28 changed files with 790 additions and 110 deletions.
22 changes: 21 additions & 1 deletion docs/dev/table/connectors/formats/canal.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,30 @@ Format Options
<td>Specify the input and output timestamp format. Currently supported values are <code>'SQL'</code> and <code>'ISO-8601'</code>:
<ul>
<li>Option <code>'SQL'</code> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, e.g '2020-12-30 12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code>will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
<li>Option <code>'ISO-8601'</code> will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, e.g '2020-12-30T12:13:14.123' and output timestamp in the same format.</li>
</ul>
</td>
</tr>
<tr>
<td><h5>canal-json.map-null-key.mode</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;"><code>'FAIL'</code></td>
<td>String</td>
<td>Specify the handling mode when serializing null keys for map data. Currently supported values are <code>'FAIL'</code>, <code>'DROP'</code> and <code>'LITERAL'</code>:
<ul>
<li>Option <code>'FAIL'</code> will throw exception when encountering map value with null key.</li>
<li>Option <code>'DROP'</code> will drop null key entries for map data.</li>
<li>Option <code>'LITERAL'</code> will replace null key with string literal. The string literal is defined by <code>canal-json.map-null-key.literal</code> option.</li>
</ul>
</td>
</tr>
<tr>
<td><h5>canal-json.map-null-key.literal</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">'null'</td>
<td>String</td>
<td>Specify string literal to replace null key when <code>'canal-json.map-null-key.mode'</code> is LITERAL.</td>
</tr>
<tr>
<td><h5>canal-json.database.include</h5></td>
<td>optional</td>
Expand Down
22 changes: 21 additions & 1 deletion docs/dev/table/connectors/formats/canal.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,33 @@ Format 参数
<td>选填</td>
<td style="word-wrap: break-word;"><code>'SQL'</code></td>
<td>String</td>
<td>指定输入和输出时间戳格式。 当前支持的值是 <code>'SQL'</code> 和 <code>'ISO-8601'</code>:
<td>指定输入和输出时间戳格式。当前支持的值是 <code>'SQL'</code> 和 <code>'ISO-8601'</code>:
<ul>
<li>选项 <code>'SQL'</code> 将解析 "yyyy-MM-dd HH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30 12:13:14.123',并以相同格式输出时间戳。</li>
<li>选项 <code>'ISO-8601'</code> 将解析 "yyyy-MM-ddTHH:mm:ss.s{precision}" 格式的输入时间戳,例如 '2020-12-30T12:13:14.123',并以相同的格式输出时间戳。</li>
</ul>
</td>
</tr>
<tr>
<td><h5>canal-json.map-null-key.mode</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;"><code>'FAIL'</code></td>
<td>String</td>
<td>指定处理 Map 中 key 值为空的方法. 当前支持的值有 <code>'FAIL'</code>, <code>'DROP'</code> 和 <code>'LITERAL'</code>:
<ul>
<li>Option <code>'FAIL'</code> 将抛出异常,如果遇到 Map 中 key 值为空的数据。</li>
<li>Option <code>'DROP'</code> 将丢弃 Map 中 key 值为空的数据项。</li>
<li>Option <code>'LITERAL'</code> 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 <code>'canal-json.map-null-key.literal'</code> 定义。</li>
</ul>
</td>
</tr>
<tr>
<td><h5>canal-json.map-null-key.literal</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;">'null'</td>
<td>String</td>
<td>当 <code>'canal-json.map-null-key.mode'</code> 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。</td>
</tr>
<tr>
<td><h5>canal-json.database.include</h5></td>
<td>optional</td>
Expand Down
21 changes: 21 additions & 0 deletions docs/dev/table/connectors/formats/debezium.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,27 @@ Format Options
</ul>
</td>
</tr>
<tr>
<td><h5>debezium-json.map-null-key.mode</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;"><code>'FAIL'</code></td>
<td>String</td>
<td>Specify the handling mode when serializing null keys for map data. Currently supported values are <code>'FAIL'</code>, <code>'DROP'</code> and <code>'LITERAL'</code>:
<ul>
<li>Option <code>'FAIL'</code> will throw exception when encountering map with null key.</li>
<li>Option <code>'DROP'</code> will drop null key entries for map data.</li>
<li>Option <code>'LITERAL'</code> will replace null key with string literal. The string literal is defined by <code>debezium-json.map-null-key.literal</code> option.</li>
</ul>
</td>
</tr>
<tr>
<td><h5>debezium-json.map-null-key.literal</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">'null'</td>
<td>String</td>
<td>Specify string literal to replace null key when <code>'debezium-json.map-null-key.mode'</code> is LITERAL.</td>
</tr>
<tr>
</tbody>
</table>

Expand Down
20 changes: 20 additions & 0 deletions docs/dev/table/connectors/formats/debezium.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,26 @@ Format 参数
</ul>
</td>
</tr>
<tr>
<td><h5>debezium-json.map-null-key.mode</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;"><code>'FAIL'</code></td>
<td>String</td>
<td>指定处理 Map 中 key 值为空的方法. 当前支持的值有 <code>'FAIL'</code>, <code>'DROP'</code> 和 <code>'LITERAL'</code>:
<ul>
<li>Option <code>'FAIL'</code> 将抛出异常,如果遇到 Map 中 key 值为空的数据。</li>
<li>Option <code>'DROP'</code> 将丢弃 Map 中 key 值为空的数据项。</li>
<li>Option <code>'LITERAL'</code> 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 <code>'debezium-json.map-null-key.literal'</code> 定义。</li>
</ul>
</td>
</tr>
<tr>
<td><h5>debezium-json.map-null-key.literal</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;">'null'</td>
<td>String</td>
<td>当 <code>'debezium-json.map-null-key.mode'</code> 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。</td>
</tr>
</tbody>
</table>

Expand Down
20 changes: 20 additions & 0 deletions docs/dev/table/connectors/formats/json.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,26 @@ Format Options
</ul>
</td>
</tr>
<tr>
<td><h5>json.map-null-key.mode</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;"><code>'FAIL'</code></td>
<td>String</td>
<td>Specify the handling mode when serializing null keys for map data. Currently supported values are <code>'FAIL'</code>, <code>'DROP'</code> and <code>'LITERAL'</code>:
<ul>
<li>Option <code>'FAIL'</code> will throw exception when encountering map with null key.</li>
<li>Option <code>'DROP'</code> will drop null key entries for map data.</li>
<li>Option <code>'LITERAL'</code> will replace null key with string literal. The string literal is defined by <code>json.map-null-key.literal</code> option.</li>
</ul>
</td>
</tr>
<tr>
<td><h5>json.map-null-key.literal</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">'null'</td>
<td>String</td>
<td>Specify string literal to replace null key when <code>'json.map-null-key.mode'</code> is LITERAL.</td>
</tr>
</tbody>
</table>

Expand Down
20 changes: 20 additions & 0 deletions docs/dev/table/connectors/formats/json.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ Format 参数
</ul>
</td>
</tr>
<tr>
<td><h5>json.map-null-key.mode</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;"><code>'FAIL'</code></td>
<td>String</td>
<td>指定处理 Map 中 key 值为空的方法. 当前支持的值有 <code>'FAIL'</code>, <code>'DROP'</code> 和 <code>'LITERAL'</code>:
<ul>
<li>Option <code>'FAIL'</code> 将抛出异常,如果遇到 Map 中 key 值为空的数据。</li>
<li>Option <code>'DROP'</code> 将丢弃 Map 中 key 值为空的数据项。</li>
<li>Option <code>'LITERAL'</code> 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 <code>'json.map-null-key.literal'</code> 定义。</li>
</ul>
</td>
</tr>
<tr>
<td><h5>json.map-null-key.literal</h5></td>
<td>选填</td>
<td style="word-wrap: break-word;">'null'</td>
<td>String</td>
<td>当 <code>'json.map-null-key.mode'</code> 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。</td>
</tr>
</tbody>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
Expand All @@ -43,8 +42,11 @@

import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
import static org.apache.flink.formats.json.JsonOptions.MAP_NULL_KEY_LITERAL;
import static org.apache.flink.formats.json.JsonOptions.MAP_NULL_KEY_MODE;
import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT_ENUM;
import static org.apache.flink.formats.json.JsonOptions.validateDecodingFormatOptions;
import static org.apache.flink.formats.json.JsonOptions.validateEncodingFormatOptions;

/**
* Table format factory for providing configured instances of JSON to RowData
Expand All @@ -61,7 +63,7 @@ public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateFormatOptions(formatOptions);
validateDecodingFormatOptions(formatOptions);

final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
Expand Down Expand Up @@ -96,16 +98,23 @@ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
validateEncodingFormatOptions(formatOptions);

TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
JsonOptions.MapNullKeyMode mapNullKeyMode = JsonOptions.getMapNullKeyMode(formatOptions);
String mapNullKeyLiteral = formatOptions.get(MAP_NULL_KEY_LITERAL);

return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context,
DataType consumedDataType) {
final RowType rowType = (RowType) consumedDataType.getLogicalType();
return new JsonRowDataSerializationSchema(rowType, timestampOption);
return new JsonRowDataSerializationSchema(
rowType,
timestampOption,
mapNullKeyMode,
mapNullKeyLiteral);
}

@Override
Expand All @@ -131,26 +140,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(FAIL_ON_MISSING_FIELD);
options.add(IGNORE_PARSE_ERRORS);
options.add(TIMESTAMP_FORMAT);
options.add(MAP_NULL_KEY_MODE);
options.add(MAP_NULL_KEY_LITERAL);
return options;
}

// ------------------------------------------------------------------------
// Validation
// ------------------------------------------------------------------------

static void validateFormatOptions(ReadableConfig tableOptions) {
boolean failOnMissingField = tableOptions.get(FAIL_ON_MISSING_FIELD);
boolean ignoreParseErrors = tableOptions.get(IGNORE_PARSE_ERRORS);
String timestampFormat = tableOptions.get(TIMESTAMP_FORMAT);
if (ignoreParseErrors && failOnMissingField) {
throw new ValidationException(FAIL_ON_MISSING_FIELD.key()
+ " and "
+ IGNORE_PARSE_ERRORS.key()
+ " shouldn't both be true.");
}
if (!TIMESTAMP_FORMAT_ENUM.contains(timestampFormat)){
throw new ValidationException(String.format("Unsupported value '%s' for %s. Supported values are [SQL, ISO-8601].",
timestampFormat, TIMESTAMP_FORMAT.key()));
}
}
}
Loading

0 comments on commit ba7c232

Please sign in to comment.