Skip to content

Commit

Permalink
[FLINK-26092][table-runtime] Fix JSON_OBJECTAGG when emitting NULL
Browse files Browse the repository at this point in the history
Previously, when the Json aggregation is taking place, and id
JsonOnNull.NULL is selected, which means that we still want to emit
a `null` JSON node, .i.e `{.... "myField" : null ... }` when no values
get accumulated, we used a null `StringData` object. When
`state.backend.changelog.enabled` is enabled, the contents of the map
accumulating the aggregated records, gets serialized leading to NPE,
since `null` is not supported by `StringDataSerilizer`.

To solve this, we instead create a StringData with an empty `byte[]`,
which denotes the null, and when the aggregation ends and we create
the final JSON result, we check for a `byte[]` of `length` `0` in
order to write the JSON `null` node.
  • Loading branch information
matriv authored and rkhachatryan committed Mar 29, 2022
1 parent a4d194e commit 4c89959
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.flink.table.planner.functions;

import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
Expand Down Expand Up @@ -56,6 +57,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.runtime.state.StateBackendLoader.HASHMAP_STATE_BACKEND_NAME;
import static org.apache.flink.runtime.state.StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME;
import static org.apache.flink.table.test.TableAssertions.assertThat;
import static org.apache.flink.table.types.DataType.getFieldDataTypes;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -204,25 +207,39 @@ TestSpec testResult(
return this;
}

private Executable createTestItemExecutable(TestItem testItem) {
private Executable createTestItemExecutable(TestItem testItem, String stateBackend) {
return () -> {
Configuration conf = new Configuration();
conf.set(StateBackendOptions.STATE_BACKEND, stateBackend);
final TableEnvironment tEnv =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
// see https://issues.apache.org/jira/browse/FLINK-26092
tEnv.getConfig().set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
TableEnvironment.create(
EnvironmentSettings.newInstance()
.inStreamingMode()
.withConfiguration(conf)
.build());
final Table sourceTable = asTable(tEnv, sourceRowType, sourceRows);

testItem.execute(tEnv, sourceTable);
};
}

Stream<BuiltInFunctionTestBase.TestCase> getTestCases() {
return testItems.stream()
.map(
testItem ->
new BuiltInFunctionTestBase.TestCase(
testItem.toString(),
createTestItemExecutable(testItem)));
return Stream.concat(
testItems.stream()
.map(
testItem ->
new BuiltInFunctionTestBase.TestCase(
testItem.toString(),
createTestItemExecutable(
testItem, HASHMAP_STATE_BACKEND_NAME))),
testItems.stream()
.map(
testItem ->
new BuiltInFunctionTestBase.TestCase(
testItem.toString(),
createTestItemExecutable(
testItem,
ROCKSDB_STATE_BACKEND_NAME))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class JsonObjectAggFunction
extends BuiltInAggregateFunction<String, JsonObjectAggFunction.Accumulator> {

private static final long serialVersionUID = 1L;
private static final StringData NULL_STRING_DATA = StringData.fromBytes(new byte[] {});
private static final NullNode NULL_NODE = getNodeFactory().nullNode();

private final transient List<DataType> argumentTypes;
Expand Down Expand Up @@ -107,7 +108,9 @@ public void accumulate(Accumulator acc, StringData keyData, @Nullable StringData

if (valueData == null) {
if (!skipNulls) {
acc.map.put(keyData, null);
// We cannot use null for StringData here, since it's not supported by the
// StringDataSerializer, instead use a StringData with an empty byte[]
acc.map.put(keyData, NULL_STRING_DATA);
}
} else {
acc.map.put(keyData, valueData);
Expand Down Expand Up @@ -135,7 +138,7 @@ public String getValue(Accumulator acc) {
for (final StringData key : acc.map.keys()) {
final StringData value = acc.map.get(key);
final JsonNode valueNode =
value == null
value.toBytes().length == 0
? NULL_NODE
: getNodeFactory().rawValueNode(new RawValue(value.toString()));

Expand Down

0 comments on commit 4c89959

Please sign in to comment.