Skip to content

Commit

Permalink
[FLINK-2382] fix Flink live accumulators for TwoInputStreamTask
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Jul 21, 2015
1 parent 0f589aa commit 1d373a7
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
Expand Down Expand Up @@ -66,6 +67,10 @@ public void registerInputOutput() {

inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2, inputDeserializer1, inputDeserializer2, getExecutionConfig().areTimestampsEnabled());

AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
inputProcessor.setReporter(reporter);

inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
}

Expand Down Expand Up @@ -107,7 +112,7 @@ public void invoke() throws Exception {
LOG.warn("Exception while closing operator.", t);
}
}

throw e;
}
finally {
Expand Down

0 comments on commit 1d373a7

Please sign in to comment.