Skip to content

Commit

Permalink
[FLINK-11041][test] ReinterpretDataStreamAsKeyedStreamITCase source s…
Browse files Browse the repository at this point in the history
…hould hold checkpointing lock
  • Loading branch information
StefanRRichter authored and tillrohrmann committed Dec 11, 2018
1 parent 927936d commit 70b2029
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ public RandomTupleSource(int numEvents, int numKeys) {
public void run(SourceContext<Tuple2<Integer, Integer>> out) throws Exception {
Random random = new Random(42);
while (--remainingEvents >= 0) {
out.collect(new Tuple2<>(random.nextInt(numKeys), 1));
synchronized (out.getCheckpointLock()) {
out.collect(new Tuple2<>(random.nextInt(numKeys), 1));
}
}
}

Expand Down

0 comments on commit 70b2029

Please sign in to comment.