Skip to content

Commit

Permalink
[FLINK-17080][java,table] Fix possible NPE in Utils#CollectHelper
Browse files Browse the repository at this point in the history
This closes apache#11688
  • Loading branch information
tsreaper authored Apr 15, 2020
1 parent ffb9878 commit 0fa0830
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 3 deletions.
9 changes: 7 additions & 2 deletions flink-java/src/main/java/org/apache/flink/api/java/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,13 @@ public void writeRecord(T record) throws IOException {

@Override
public void close() {
// Important: should only be added in close method to minimize traffic of accumulators
getRuntimeContext().addAccumulator(id, accumulator);
// when the sink is up but not initialized and the job fails due to other operators,
// it is possible that close() is called when open() is not called,
// so we have to do this null check
if (accumulator != null) {
// Important: should only be added in close method to minimize traffic of accumulators
getRuntimeContext().addAccumulator(id, accumulator);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,11 @@ public static List<Row> collectToList(Table table) throws Exception {
tEnv.insertInto(sinkName, table);
JobExecutionResult executionResult = tEnv.execute(jobName);
ArrayList<byte[]> accResult = executionResult.getAccumulatorResult(id);
deserializedList = SerializedListAccumulator.deserializeList(accResult, serializer);
if (accResult != null) {
deserializedList = SerializedListAccumulator.deserializeList(accResult, serializer);
} else {
throw new RuntimeException("Could not retrieve table result. It is very likely that the job fails.");
}
} finally {
tEnv.dropTemporaryTable(sinkName);
}
Expand Down

0 comments on commit 0fa0830

Please sign in to comment.