Skip to content

Commit

Permalink
[FLINK-7728] [DataStream] Simplify BufferedValveOutputHandler used in…
Browse files Browse the repository at this point in the history
… StatusWatermarkValveTest

The previous implementation was overly complicated. Having separate
buffers for the StreamStatus and Watermarks is not required for our
tests. Also, that design doesn't allow checking the order StreamStatus /
Watermarks are emitted from a single input to the valve.

This commit reworks it by buffering both StreamStatus and Watermarks in
a shared queue.
  • Loading branch information
tzulitai authored and aljoscha committed Oct 2, 2017
1 parent c81a7e5 commit 6481564
Showing 1 changed file with 59 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.streaming.api.watermark.Watermark;

import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.junit.Test;

import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -57,21 +58,13 @@ public void testAllInputChannelsStartAsActive() {
// ------------------------------------------------------------------------

valve.inputStreamStatus(StreamStatus.IDLE, 3);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());

valve.inputStreamStatus(StreamStatus.IDLE, 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());

valve.inputStreamStatus(StreamStatus.IDLE, 1);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputStreamStatus(StreamStatus.IDLE, 2);
assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());
assertEquals(null, valveOutput.popLastSeenOutput());
}

/**
Expand All @@ -86,73 +79,57 @@ public void testOneInputValve() {
// start off with an ACTIVE status; since the valve should initially start as ACTIVE,
// no state change is toggled, therefore no stream status should be emitted
valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

// input some monotonously increasing watermarks while ACTIVE;
// the exact same watermarks should be emitted right after the inputs
valve.inputWatermark(new Watermark(0), 0);
assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(25), 0);
assertEquals(new Watermark(25), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(25), valveOutput.popLastSeenOutput());

// decreasing watermarks should not result in any output
valve.inputWatermark(new Watermark(18), 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(42), 0);
assertEquals(new Watermark(42), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(42), valveOutput.popLastSeenOutput());

// toggling ACTIVE to IDLE should result in an IDLE stream status output
valve.inputStreamStatus(StreamStatus.IDLE, 0);
assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());

// watermark inputs should be ignored while all input channels (only 1 in this case) are IDLE
valve.inputWatermark(new Watermark(52), 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(60), 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());


// no status change toggle while IDLE should result in stream status outputs
valve.inputStreamStatus(StreamStatus.IDLE, 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());


// toggling IDLE to ACTIVE should result in an ACTIVE stream status output
valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());


// the valve should remember the last watermark input channels received while they were ACTIVE (which was 42);
// decreasing watermarks should therefore still be ignored, even after a status toggle
valve.inputWatermark(new Watermark(40), 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

// monotonously increasing watermarks after resuming to be ACTIVE should be output normally
valve.inputWatermark(new Watermark(68), 0);
assertEquals(new Watermark(68), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(68), valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(72), 0);
assertEquals(new Watermark(72), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(72), valveOutput.popLastSeenOutput());
assertEquals(null, valveOutput.popLastSeenOutput());
}

/**
Expand All @@ -170,48 +147,38 @@ public void testMultipleInputValve() {
// ------------------------------------------------------------------------

valve.inputWatermark(new Watermark(0), 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(0), 1);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(0), 2);
assertEquals(new Watermark(0), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(0), valveOutput.popLastSeenOutput());

// ------------------------------------------------------------------------
// Ensure that watermarks are output as soon as the overall min
// watermark across all channels have advanced.
// ------------------------------------------------------------------------

valve.inputWatermark(new Watermark(12), 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(8), 2);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(10), 2);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(15), 1);
// lowest watermark across all channels is now channel 2, with watermark @ 10
assertEquals(new Watermark(10), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(10), valveOutput.popLastSeenOutput());

// ------------------------------------------------------------------------
// Ensure that decreasing watermarks are ignored
// ------------------------------------------------------------------------

valve.inputWatermark(new Watermark(6), 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

// ------------------------------------------------------------------------
// Ensure that when some input channel becomes idle, that channel will
Expand All @@ -224,25 +191,18 @@ public void testMultipleInputValve() {
// also, now that channel 2 is IDLE, the overall min watermark is 12 (from channel 0),
// so the valve should output that
valve.inputStreamStatus(StreamStatus.IDLE, 2);
assertEquals(new Watermark(12), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(12), valveOutput.popLastSeenOutput());

// from now on, since channel 2 is IDLE, the valve should use watermarks only from
// channel 0 and 1 to find the min watermark, even if channel 2 has the lowest watermark (10)
valve.inputWatermark(new Watermark(17), 0);
assertEquals(new Watermark(15), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(15), valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(25), 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(20), 1);
assertEquals(new Watermark(20), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(20), valveOutput.popLastSeenOutput());

// ------------------------------------------------------------------------
// Ensure that after some channel resumes to be ACTIVE, it needs to
Expand All @@ -255,45 +215,34 @@ public void testMultipleInputValve() {
// resuming channel 2 to be ACTIVE shouldn't result in overall status toggle for the valve,
// because the valve wasn't overall IDLE, so there should not be any stream status outputs;
valve.inputStreamStatus(StreamStatus.ACTIVE, 2);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

// although watermarks for channel 2 will now be accepted, it still
// hasn't caught up with the overall min watermark (20)
valve.inputWatermark(new Watermark(18), 2);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

// since channel 2 hasn't caught up yet, it is still ignored when advancing new min watermarks
valve.inputWatermark(new Watermark(22), 1);
assertEquals(new Watermark(22), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(22), valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(28), 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(33), 1);
assertEquals(new Watermark(28), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(28), valveOutput.popLastSeenOutput());

// now, channel 2 has caught up with the overall min watermark
valve.inputWatermark(new Watermark(30), 2);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(31), 0);
// this acknowledges that channel 2's watermark is being accounted for again
assertEquals(new Watermark(30), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(30), valveOutput.popLastSeenOutput());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(34), 2);
assertEquals(new Watermark(31), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(31), valveOutput.popLastSeenOutput());

// ------------------------------------------------------------------------
// Ensure that once all channels are IDLE, the valve should also
Expand All @@ -303,19 +252,14 @@ public void testMultipleInputValve() {
valve.inputStreamStatus(StreamStatus.IDLE, 0);
// this is because once channel 0 becomes IDLE,
// the new min watermark will be 33 (channel 1)
assertEquals(new Watermark(33), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(33), valveOutput.popLastSeenOutput());

valve.inputStreamStatus(StreamStatus.IDLE, 2);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

// now let all channels become idle; we should only see the idle marker be emitted, and nothing else
valve.inputStreamStatus(StreamStatus.IDLE, 1);
assertEquals(StreamStatus.IDLE, valveOutput.popLastOutputStreamStatus());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(StreamStatus.IDLE, valveOutput.popLastSeenOutput());

// ------------------------------------------------------------------------
// Ensure that as channels gradually become ACTIVE again, the above behaviours
Expand All @@ -325,86 +269,60 @@ public void testMultipleInputValve() {

// let channel 0 resume to be ACTIVE
valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
assertEquals(StreamStatus.ACTIVE, valveOutput.popLastOutputStreamStatus());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(StreamStatus.ACTIVE, valveOutput.popLastSeenOutput());

// channel 0 is the only ACTIVE channel now, and is the only channel
// accounted for when advancing min watermark
valve.inputWatermark(new Watermark(36), 0);
assertEquals(new Watermark(36), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(36), valveOutput.popLastSeenOutput());

// new also let channel 1 become ACTIVE
valve.inputStreamStatus(StreamStatus.ACTIVE, 1);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

// channel 1 is still behind overall min watermark
valve.inputWatermark(new Watermark(35), 1);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

// since channel 1 is still behind, channel 0 remains to be the only
// channel used to advance min watermark
valve.inputWatermark(new Watermark(37), 0);
assertEquals(new Watermark(37), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(37), valveOutput.popLastSeenOutput());

// temporarily let channel 0 (the only active and aligned input) become idle;
// this should not result in any watermark or stream status output,
// because channel 1 is still active (therefore no stream status toggle) and
// at the same time not aligned (therefore should not produce any new min watermarks)
valve.inputStreamStatus(StreamStatus.IDLE, 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputStreamStatus(StreamStatus.ACTIVE, 0);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

// now, let channel 1 catch up with the overall min watermark
valve.inputWatermark(new Watermark(38), 1);
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(null, valveOutput.popLastSeenOutput());

valve.inputWatermark(new Watermark(40), 0);
// this acknowledges that channel 1's watermark is being accounted for again
assertEquals(new Watermark(38), valveOutput.popLastOutputWatermark());
assertTrue(valveOutput.hasNoOutputWatermarks());
assertTrue(valveOutput.hasNoOutputStreamStatuses());
assertEquals(new Watermark(38), valveOutput.popLastSeenOutput());
}

private class BufferedValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
private BlockingQueue<Watermark> outputWatermarks = new LinkedBlockingQueue<>();
private BlockingQueue<StreamStatus> outputStreamStatuses = new LinkedBlockingQueue<>();
private BlockingQueue<StreamElement> allOutputs = new LinkedBlockingQueue<>();

@Override
public void handleWatermark(Watermark watermark) {
outputWatermarks.add(watermark);
allOutputs.add(watermark);
}

@Override
public void handleStreamStatus(StreamStatus streamStatus) {
outputStreamStatuses.add(streamStatus);
}

public Watermark popLastOutputWatermark() {
return outputWatermarks.poll();
}

public StreamStatus popLastOutputStreamStatus() {
return outputStreamStatuses.poll();
}

public boolean hasNoOutputWatermarks() {
return outputWatermarks.size() == 0;
allOutputs.add(streamStatus);
}

public boolean hasNoOutputStreamStatuses() {
return outputStreamStatuses.size() == 0;
public StreamElement popLastSeenOutput() {
return allOutputs.poll();
}
}

Expand Down

0 comments on commit 6481564

Please sign in to comment.