Skip to content

Commit

Permalink
[FLINK-22700] [api] Propagate watermarks to sink API (FLIP-167)
Browse files Browse the repository at this point in the history
This closes apache#15950.
  • Loading branch information
EronWright authored and tzulitai committed Jul 6, 2021
1 parent 869f6b0 commit c0244d3
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.flink.api.connector.sink;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.eventtime.Watermark;

import java.io.IOException;
import java.util.List;
Expand All @@ -46,6 +47,16 @@ public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {
*/
void write(InputT element, Context context) throws IOException;

/**
* Add a watermark to the writer.
*
* <p>This method is intended for advanced sinks that propagate watermarks.
*
* @param watermark The watermark.
* @throws IOException if fail to add a watermark.
*/
default void writeWatermark(Watermark watermark) throws IOException {}

/**
* Prepare for a commit.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.functions.sink;

import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.functions.Function;

import java.io.Serializable;
Expand Down Expand Up @@ -49,6 +50,17 @@ default void invoke(IN value, Context context) throws Exception {
invoke(value);
}

/**
* Writes the given watermark to the sink. This function is called for every watermark.
*
* <p>This method is intended for advanced sinks that propagate watermarks.
*
* @param watermark The watermark.
* @throws Exception This method may throw exceptions. Throwing an exception will cause the
* operation to fail and may trigger recovery.
*/
default void writeWatermark(Watermark watermark) throws Exception {}

/**
* Context that {@link SinkFunction SinkFunctions } can use for getting additional data about an
* input record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
this.currentWatermark = mark.getTimestamp();
userFunction.writeWatermark(
new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
}

private class SimpleContext<IN> implements SinkFunction.Context {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
this.currentWatermark = mark.getTimestamp();
sinkWriter.writeWatermark(
new org.apache.flink.api.common.eventtime.Watermark(mark.getTimestamp()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ public void testTimeQuerying() throws Exception {
new Tuple4<>(42L, 15L, 13L, "Ciao"),
new Tuple4<>(42L, 15L, null, "Ciao")));

assertThat(bufferingSink.watermarks.size(), is(3));

assertThat(
bufferingSink.watermarks,
contains(
new org.apache.flink.api.common.eventtime.Watermark(17L),
new org.apache.flink.api.common.eventtime.Watermark(42L),
new org.apache.flink.api.common.eventtime.Watermark(42L)));

testHarness.close();
}

Expand All @@ -87,8 +96,11 @@ private static class BufferingQueryingSink<T> implements SinkFunction<T> {
// watermark, processing-time, timestamp, event
private final List<Tuple4<Long, Long, Long, T>> data;

private final List<org.apache.flink.api.common.eventtime.Watermark> watermarks;

public BufferingQueryingSink() {
data = new ArrayList<>();
watermarks = new ArrayList<>();
}

@Override
Expand All @@ -110,5 +122,11 @@ public void invoke(T value, Context context) throws Exception {
value));
}
}

@Override
public void writeWatermark(org.apache.flink.api.common.eventtime.Watermark watermark)
throws Exception {
watermarks.add(watermark);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,29 @@ public void timeBasedBufferingSinkWriter() throws Exception {
Tuple3.of(2, initialTime + 2, Long.MIN_VALUE).toString())));
}

@Test
public void watermarkPropagatedToSinkWriter() throws Exception {
final long initialTime = 0;

final TestSink.DefaultSinkWriter writer = new TestSink.DefaultSinkWriter();
final OneInputStreamOperatorTestHarness<Integer, String> testHarness =
createTestHarness(
TestSink.newBuilder().setWriter(writer).withWriterState().build());
testHarness.open();

testHarness.processWatermark(initialTime);
testHarness.processWatermark(initialTime + 1);

assertThat(
testHarness.getOutput(),
contains(new Watermark(initialTime), new Watermark(initialTime + 1)));
assertThat(
writer.watermarks,
contains(
new org.apache.flink.api.common.eventtime.Watermark(initialTime),
new org.apache.flink.api.common.eventtime.Watermark(initialTime + 1)));
}

/**
* A {@link SinkWriter} that only returns committables from {@link #prepareCommit(boolean)} when
* {@code flush} is {@code true}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
Expand Down Expand Up @@ -199,10 +200,13 @@ static class DefaultSinkWriter implements SinkWriter<Integer, String, String>, S

protected List<String> elements;

protected List<Watermark> watermarks;

protected ProcessingTimeService processingTimerService;

DefaultSinkWriter() {
this.elements = new ArrayList<>();
this.watermarks = new ArrayList<>();
}

@Override
Expand All @@ -211,6 +215,11 @@ public void write(Integer element, Context context) {
Tuple3.of(element, context.timestamp(), context.currentWatermark()).toString());
}

@Override
public void writeWatermark(Watermark watermark) throws IOException {
watermarks.add(watermark);
}

@Override
public List<String> prepareCommit(boolean flush) {
List<String> result = elements;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
Expand Down Expand Up @@ -104,9 +105,12 @@ public void close() throws Exception {
}

@Override
public void invoke(String value, SinkFunction.Context context) {
if (context.currentWatermark() != lastWatermark) {
lastWatermark = context.currentWatermark();
public void invoke(String value, SinkFunction.Context context) {}

@Override
public void writeWatermark(Watermark watermark) throws Exception {
if (watermark.getTimestamp() != lastWatermark) {
lastWatermark = watermark.getTimestamp();
numWatermarks.add(1);
}
}
Expand Down

0 comments on commit c0244d3

Please sign in to comment.