Skip to content

Commit

Permalink
[FLINK-3428] Adds a fixed time trailing watermark extractor.
Browse files Browse the repository at this point in the history
This closes apache#1764
  • Loading branch information
kl0u authored and rmetzger committed Apr 28, 2016
1 parent a1968f0 commit 6225aa6
Show file tree
Hide file tree
Showing 12 changed files with 471 additions and 161 deletions.
108 changes: 108 additions & 0 deletions docs/apis/streaming/event_timestamp_extractors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
---
title: "Pre-defined Timestamp Extractors / Watermark Emitters"

sub-nav-group: streaming
sub-nav-pos: 1
sub-nav-parent: eventtime
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

* toc
{:toc}

As described in the [timestamps and watermark handling]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html) page,
Flink provides abstractions that allow the programmer to assign her own timestamps and emit her own watermarks. More specifically,
she can do so by implementing one of the `AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks` interfaces, depending
on her use-case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of
the incoming records, e.g. whenever a special element is encountered in the stream.

In order to further ease the programming effort for such tasks, Flink comes with some pre-implemented timestamp assigners.
This section provides a list of them. Apart from their out-of-the-box functionality, their implementation can serve as an example
for custom assigner implementations.

#### **Assigner with Ascending Timestamps**

The simplest special case for *periodic* watermark generation is the case where timestamps seen by a given source task
occur in ascending order. In that case, the current timestamp can always act as a watermark, because no lower timestamps will
occur any more.

Note that it is only necessary that timestamps are ascending *per parallel data source task*. For example, if
in a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary that
timestamps are ascending within each Kafka partition. Flink's Watermark merging mechanism will generate correct
watermarks whenever parallel streams are shuffled, unioned, connected, or merged.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {

@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
{% endhighlight %}
</div>
</div>

#### **Assigner which allows a fixed amount of record lateness**

Another example of periodic watermark generation is the one where the watermark lags behind the maximum (event-time) timestamp
seen in the stream, by a fixed amount of time. This case covers scenarios where the maximum lateness that can be encountered in a
stream is known in advance, e.g. when creating a custom source containing elements with timestamps spread within a fixed period of
time for testing. For these cases, Flink provides the `BoundedOutOfOrdernessTimestampExtractor` which takes as argument
the `maxOutOfOrderness`, i.e. the maximum amount of time an element is allowed to be late, before being ignored when computing the
final result for the given window. Lateness corresponds to the result of `t - t_w`, where `t` is the (event-time) timestamp of an
element, and `t_w` that of the previous watermark. If `lateness > 0` then the element is considered late and is ignored when computing
the result of the job for its corresponding window.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {

@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](Time.seconds(10))( _.getCreationTime ))
{% endhighlight %}
</div>
</div>
51 changes: 8 additions & 43 deletions docs/apis/streaming/event_timestamps_watermarks.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ the progress in event time.
There are two ways to assign timestamps and generate Watermarks:

1. Directly in the data stream source
2. Via a *TimestampAssigner / WatermarkGenerator*
2. Via a timestamp assigner / watermark generator: in Flink timestamp assigners also define the watermarks to be emitted


### Source Functions with Timestamps and Watermarks
Expand Down Expand Up @@ -116,10 +116,15 @@ those timestamps will be overwritten by the TimestampAssigner. Similarly, Waterm
Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
original stream had timestamps or watermarks already, the timestamp assigner overwrites those.

The timestamp assigners occur usually direct after the data source, but it is not strictly required to. A
The timestamp assigners occur usually immediately after the data source, but it is not strictly required to. A
common pattern is for example to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
In any case, the timestamp assigner needs to occur before the first operation on event time
(such as the first window operation).
(such as the first window operation).

**NOTE:** The remainder of this section presents the main interfaces a programmer has
to implement in order to create her own timestamp extractors/watermark emitters.
To see the pre-implemented extractors that ship with Flink, please refer to the
[Pre-defined Timestamp Extractors / Watermark Emitters]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html) page.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
Expand Down Expand Up @@ -161,45 +166,6 @@ withTimestampsAndWatermarks
</div>


#### **With Ascending timestamps**

The simplest case for generating watermarks is the case where timestamps within one source occur in ascending order.
In that case, the current timestamp can always act as a watermark, because no lower timestamps will occur any more.

Note that it is only necessary that timestamps are ascending *per parallel data source instance*. For example, if
in a specific setup one Kafka partition is read by one parallel data source instance, then it is only necessary that
timestamps are ascending within each Kafka partition. Flink's Watermark merging mechanism will generate correct
whenever parallel streams are shuffled, unioned, connected, or merged.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<MyEvent> stream = ...

DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {

@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val stream: DataStream[MyEvent] = ...

val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
{% endhighlight %}
</div>
</div>


*Note:* The generation of watermarks on ascending timestamps is a special case of the periodic watermark
generation described in the next section.


#### **With Periodic Watermarks**

The `AssignerWithPeriodicWatermarks` assigns timestamps and generate watermarks periodically (possibly depending
Expand Down Expand Up @@ -307,7 +273,6 @@ class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent]
</div>
</div>


#### **With Punctuated Watermarks**

To generate Watermarks whenever a certain event indicates that a new watermark can be generated, use the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
Expand Down Expand Up @@ -704,15 +706,15 @@ public <W extends Window> AllWindowedStream<T, W> windowAll(WindowAssigner<? sup
*
* <p>
* If you know that the timestamps are strictly increasing you can use an
* {@link org.apache.flink.streaming.api.functions.AscendingTimestampExtractor}. Otherwise,
* {@link AscendingTimestampExtractor}. Otherwise,
* you should provide a {@link TimestampExtractor} that also implements
* {@link TimestampExtractor#getCurrentWatermark()} to keep track of watermarks.
*
* @param extractor The TimestampExtractor that is called for each element of the DataStream.
*
* @deprecated Please use {@link #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)}
* of {@link #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)}
* instread.
* instead.
* @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
* @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
*/
Expand Down Expand Up @@ -740,6 +742,11 @@ public SingleOutputStreamOperator<T> assignTimestamps(TimestampExtractor<T> extr
* <p>Use this method for the common cases, where some characteristic over all elements
* should generate the watermarks, or where watermarks are simply trailing behind the
* wall clock time by a certain amount.
*
* <p>For the second case and when the watermarks are required to lag behind the maximum
* timestamp seen so far in the elements of the stream by a fixed amount of time, and this
* amount is known in advance, use the
* {@link BoundedOutOfOrdernessTimestampExtractor}.
*
* <p>For cases where watermarks should be created in an irregular fashion, for example
* based on certain markers that some element carry, use the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,123 +20,18 @@

import org.apache.flink.annotation.PublicEvolving;

import org.apache.flink.streaming.api.watermark.Watermark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Objects.requireNonNull;

/**
* A timestamp assigner and watermark generator for streams where timestamps are monotonously
* ascending. In this case, the local watermarks for the streams are easy to generate, because
* they strictly follow the timestamps.
*
* <b>Note:</b> This is just a stub class. The actual code for this has moved to
* {@link org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor}.
*
* @param <T> The type of the elements that this function can extract timestamps from
*/
@PublicEvolving
public abstract class AscendingTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

private static final long serialVersionUID = 1L;

/** The current timestamp. */
private long currentTimestamp = Long.MIN_VALUE;

/** Handler that is called when timestamp monotony is violated */
private MonotonyViolationHandler violationHandler = new LoggingHandler();


/**
* Extracts the timestamp from the given element. The timestamp must be monotonically increasing.
*
* @param element The element that the timestamp is extracted from.
* @return The new timestamp.
*/
public abstract long extractAscendingTimestamp(T element);

/**
* Sets the handler for violations to the ascending timestamp order.
*
* @param handler The violation handler to use.
* @return This extractor.
*/
public AscendingTimestampExtractor<T> withViolationHandler(MonotonyViolationHandler handler) {
this.violationHandler = requireNonNull(handler);
return this;
}

// ------------------------------------------------------------------------

@Override
public final long extractTimestamp(T element, long elementPrevTimestamp) {
final long newTimestamp = extractAscendingTimestamp(element);
if (newTimestamp >= this.currentTimestamp) {
this.currentTimestamp = newTimestamp;
return newTimestamp;
} else {
violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
return newTimestamp;
}
}

@Override
public final Watermark getCurrentWatermark() {
return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
}

// ------------------------------------------------------------------------
// Handling violations of monotonous timestamps
// ------------------------------------------------------------------------

/**
* Interface for handlers that handle violations of the monotonous ascending timestamps
* property.
*/
public interface MonotonyViolationHandler extends java.io.Serializable {

/**
* Called when the property of monotonously ascending timestamps is violated, i.e.,
* when {@code elementTimestamp < lastTimestamp}.
*
* @param elementTimestamp The timestamp of the current element.
* @param lastTimestamp The last timestamp.
*/
void handleViolation(long elementTimestamp, long lastTimestamp);
}

/**
* Handler that does nothing when timestamp monotony is violated.
*/
public static final class IgnoringHandler implements MonotonyViolationHandler {
private static final long serialVersionUID = 1L;

@Override
public void handleViolation(long elementTimestamp, long lastTimestamp) {}
}

/**
* Handler that fails the program when timestamp monotony is violated.
*/
public static final class FailingHandler implements MonotonyViolationHandler {
private static final long serialVersionUID = 1L;

@Override
public void handleViolation(long elementTimestamp, long lastTimestamp) {
throw new RuntimeException("Ascending timestamps condition violated. Element timestamp "
+ elementTimestamp + " is smaller than last timestamp " + lastTimestamp);
}
}

/**
* Handler that only logs violations of timestamp monotony, on WARN log level.
*/
public static final class LoggingHandler implements MonotonyViolationHandler {
private static final long serialVersionUID = 1L;
public abstract class AscendingTimestampExtractor<T>
extends org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor<T> {

private static final Logger LOG = LoggerFactory.getLogger(AscendingTimestampExtractor.class);

@Override
public void handleViolation(long elementTimestamp, long lastTimestamp) {
LOG.warn("Timestamp monotony violated: {} < {}", elementTimestamp, lastTimestamp);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.functions;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;

/**
* Interface for user functions that extract timestamps from elements.
Expand All @@ -29,7 +30,7 @@
*
* <p>
* Note: If you know that timestamps are monotonically increasing you can use
* {@link org.apache.flink.streaming.api.functions.AscendingTimestampExtractor}. This will
* {@link AscendingTimestampExtractor}. This will
* keep track of watermarks.
*
* @param <T> The type of the elements that this function can extract timestamps from
Expand Down
Loading

0 comments on commit 6225aa6

Please sign in to comment.