Skip to content

Commit

Permalink
[FLINK-17903][core] WatermarkOutputMultiplexer supports String IDs an…
Browse files Browse the repository at this point in the history
…d de-registration of outputs
  • Loading branch information
StephanEwen committed May 27, 2020
1 parent c6b5a7a commit 8e01f52
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,15 +394,18 @@ private List<KafkaTopicPartitionState<T, KPH>> createPartitionStateHolders(

case WITH_WATERMARK_GENERATOR: {
for (Map.Entry<KafkaTopicPartition, Long> partitionEntry : partitionsToInitialOffsets.entrySet()) {
KPH kafkaHandle = createKafkaPartitionHandle(partitionEntry.getKey());
final KafkaTopicPartition kafkaTopicPartition = partitionEntry.getKey();
KPH kafkaHandle = createKafkaPartitionHandle(kafkaTopicPartition);
WatermarkStrategy<T> deserializedWatermarkStrategy = watermarkStrategy.deserializeValue(
userCodeClassLoader);

int outputId = watermarkOutputMultiplexer.registerNewOutput();
// the format of the ID does not matter, as long as it is unique
final String partitionId = kafkaTopicPartition.getTopic() + '-' + kafkaTopicPartition.getPartition();
watermarkOutputMultiplexer.registerNewOutput(partitionId);
WatermarkOutput immediateOutput =
watermarkOutputMultiplexer.getImmediateOutput(outputId);
watermarkOutputMultiplexer.getImmediateOutput(partitionId);
WatermarkOutput deferredOutput =
watermarkOutputMultiplexer.getDeferredOutput(outputId);
watermarkOutputMultiplexer.getDeferredOutput(partitionId);

KafkaTopicPartitionStateWithWatermarkGenerator<T, KPH> partitionState =
new KafkaTopicPartitionStateWithWatermarkGenerator<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
* #onPeriodicEmit()} is called will the deferred updates be combined and forwarded to the
* underlying output.
*
* <p>For registering a new multiplexed output, you must first call {@link #registerNewOutput()}
* and then call {@link #getImmediateOutput(int)} or {@link #getDeferredOutput(int)} with the output
* <p>For registering a new multiplexed output, you must first call {@link #registerNewOutput(String)}
* and then call {@link #getImmediateOutput(String)} or {@link #getDeferredOutput(String)} with the output
* ID you get from that. You can get both an immediate and deferred output for a given output ID,
* you can also call the getters multiple times.
*
Expand All @@ -57,16 +57,13 @@ public class WatermarkOutputMultiplexer {
*/
private final WatermarkOutput underlyingOutput;

/** The id to use for the next registered output. */
private int nextOutputId = 0;

/** The combined watermark over the per-output watermarks. */
private long combinedWatermark = Long.MIN_VALUE;

/**
* Map view, to allow finding them when requesting the {@link WatermarkOutput} for a given id.
*/
private final Map<Integer, OutputState> watermarkPerOutputId;
private final Map<String, OutputState> watermarkPerOutputId;

/**
* List of all watermark outputs, for efficient access.
Expand All @@ -88,13 +85,23 @@ public WatermarkOutputMultiplexer(WatermarkOutput underlyingOutput) {
* an output ID that can be used to get a deferred or immediate {@link WatermarkOutput} for that
* output.
*/
public int registerNewOutput() {
int newOutputId = nextOutputId;
nextOutputId++;
OutputState outputState = new OutputState();
watermarkPerOutputId.put(newOutputId, outputState);
public void registerNewOutput(String id) {
final OutputState outputState = new OutputState();

final OutputState previouslyRegistered = watermarkPerOutputId.putIfAbsent(id, outputState);
checkState(previouslyRegistered == null, "Already contains an output for ID %s", id);

watermarkOutputs.add(outputState);
return newOutputId;
}

public boolean unregisterOutput(String id) {
final OutputState output = watermarkPerOutputId.remove(id);
if (output != null) {
watermarkOutputs.remove(output);
return true;
} else {
return false;
}
}

/**
Expand All @@ -103,7 +110,7 @@ public int registerNewOutput() {
* <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
* outputs.
*/
public WatermarkOutput getImmediateOutput(int outputId) {
public WatermarkOutput getImmediateOutput(String outputId) {
Preconditions.checkArgument(
watermarkPerOutputId.containsKey(outputId),
"no output registered under id " + outputId);
Expand All @@ -118,7 +125,7 @@ public WatermarkOutput getImmediateOutput(int outputId) {
* <p>>See {@link WatermarkOutputMultiplexer} for a description of immediate and deferred
* outputs.
*/
public WatermarkOutput getDeferredOutput(int outputId) {
public WatermarkOutput getDeferredOutput(String outputId) {
Preconditions.checkArgument(
watermarkPerOutputId.containsKey(outputId),
"no output registered under id " + outputId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@

import org.junit.Test;

import java.util.UUID;

import static org.apache.flink.api.common.eventtime.WatermarkMatchers.watermark;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
* Tests for the {@link WatermarkOutputMultiplexer}.
Expand Down Expand Up @@ -261,9 +266,10 @@ public void immediateUpdateOnSameOutputAsDeferredUpdateDoesNotRegress() {
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);

int outputId = multiplexer.registerNewOutput();
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
final String id = "test-id";
multiplexer.registerNewOutput(id);
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);

deferredOutput.emitWatermark(new Watermark(5));
multiplexer.onPeriodicEmit();
Expand All @@ -284,32 +290,107 @@ public void lowerImmediateUpdateOnSameOutputDoesNotEmitCombinedUpdate() {
WatermarkOutputMultiplexer multiplexer =
new WatermarkOutputMultiplexer(underlyingWatermarkOutput);

int outputId = multiplexer.registerNewOutput();
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(outputId);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(outputId);
final String id = "1234-test";
multiplexer.registerNewOutput(id);
WatermarkOutput immediateOutput = multiplexer.getImmediateOutput(id);
WatermarkOutput deferredOutput = multiplexer.getDeferredOutput(id);

deferredOutput.emitWatermark(new Watermark(5));
immediateOutput.emitWatermark(new Watermark(2));

assertThat(underlyingWatermarkOutput.lastWatermark(), is(nullValue()));
}

@Test
public void testRemoveUnblocksWatermarks() {
final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
final long lowTimestamp = 156765L;
final long highTimestamp = lowTimestamp + 10;

multiplexer.registerNewOutput("lower");
multiplexer.registerNewOutput("higher");
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));

multiplexer.unregisterOutput("lower");
multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));

assertEquals(highTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
}

@Test
public void testRemoveOfLowestDoesNotImmediatelyAdvanceWatermark() {
final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
final long lowTimestamp = -4343L;
final long highTimestamp = lowTimestamp + 10;

multiplexer.registerNewOutput("lower");
multiplexer.registerNewOutput("higher");
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));
multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));

multiplexer.unregisterOutput("lower");

assertEquals(lowTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
}

@Test
public void testRemoveOfHighestDoesNotRetractWatermark() {
final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
final long lowTimestamp = 1L;
final long highTimestamp = 2L;

multiplexer.registerNewOutput("higher");
multiplexer.getImmediateOutput("higher").emitWatermark(new Watermark(highTimestamp));
multiplexer.unregisterOutput("higher");

multiplexer.registerNewOutput("lower");
multiplexer.getImmediateOutput("lower").emitWatermark(new Watermark(lowTimestamp));

assertEquals(highTimestamp, underlyingWatermarkOutput.lastWatermark().getTimestamp());
}

@Test
public void testRemoveRegisteredReturnValue() {
final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);
multiplexer.registerNewOutput("does-exist");

final boolean unregistered = multiplexer.unregisterOutput("does-exist");

assertTrue(unregistered);
}

@Test
public void testRemoveNotRegisteredReturnValue() {
final TestingWatermarkOutput underlyingWatermarkOutput = createTestingWatermarkOutput();
final WatermarkOutputMultiplexer multiplexer = new WatermarkOutputMultiplexer(underlyingWatermarkOutput);

final boolean unregistered = multiplexer.unregisterOutput("does-not-exist");

assertFalse(unregistered);
}

/**
* Convenience method so we don't have to go through the output ID dance when we only want an
* immediate output for a given output ID.
*/
private static WatermarkOutput createImmediateOutput(WatermarkOutputMultiplexer multiplexer) {
int outputId = multiplexer.registerNewOutput();
return multiplexer.getImmediateOutput(outputId);
final String id = UUID.randomUUID().toString();
multiplexer.registerNewOutput(id);
return multiplexer.getImmediateOutput(id);
}

/**
* Convenience method so we don't have to go through the output ID dance when we only want an
* deferred output for a given output ID.
*/
private static WatermarkOutput createDeferredOutput(WatermarkOutputMultiplexer multiplexer) {
int outputId = multiplexer.registerNewOutput();
return multiplexer.getDeferredOutput(outputId);
final String id = UUID.randomUUID().toString();
multiplexer.registerNewOutput(id);
return multiplexer.getDeferredOutput(id);
}

private static TestingWatermarkOutput createTestingWatermarkOutput() {
Expand Down

0 comments on commit 8e01f52

Please sign in to comment.