diff --git a/website/src/_includes/section-menu/documentation.html b/website/src/_includes/section-menu/documentation.html index f8753b6e342e..015441b24e60 100644 --- a/website/src/_includes/section-menu/documentation.html +++ b/website/src/_includes/section-menu/documentation.html @@ -121,6 +121,17 @@
  • Using metrics in pipeline
  • +
  • + State and Timers + + +
  • diff --git a/website/src/documentation/programming-guide.md b/website/src/documentation/programming-guide.md index ca12d67c79c3..13f7bc07d114 100644 --- a/website/src/documentation/programming-guide.md +++ b/website/src/documentation/programming-guide.md @@ -3121,3 +3121,545 @@ public class MyMetricsDoFn extends DoFn { } } ``` + +## 10. State and Timers {#state-and-timers} +Beam's windowing and triggering facilities provide a powerful abstraction for grouping and aggregating unbounded input +data based on timestamps. However there are aggregation use cases for which developers may require a higher degree of +control than provided by windows and triggers. Beam provides an API for manually managing per-key state, allowing for +fine-grained control over aggregations. + +Beam's state API models state per key. To use the state API, you start out with a keyed `PCollection`, which in Java +is modeled as a `PCollection>`. A `ParDo` processing this `PCollection` can now declare state variables. Inside +the `ParDo` these state variables can be used to write or update state for the current key or to read previous state +written for that key. State is always fully scoped only to the current processing key. + +Windowing can still be used together with stateful processing. All state for a key is scoped to the current window. This +means that the first time a key is seen for a given window any state reads will return empty, and that a runner can +garbage collect state when a window is completed. It's also often useful to use Beam's windowed aggegations prior to +the stateful operator. For example, using a combiner to preaggregate data, and then storing aggregated data inside of +state. Merging windows are not currently supported when using state and timers. + +Sometimes stateful processing is used to implement state-machine style processing inside a `DoFn`. When doing this, +care must be taken to remember that the elements in input PCollection have no guaranteed order and to ensure that the +program logic is resilient to this. Unit tests written using the DirectRunner will shuffle the order of element +processing, and are recommended to test for correctness. + +In Java DoFn declares states to be accessed by creating final `StateSpec` member variables representing each state. Each +state must be named using the `StateId` annotation; this name is unique to a ParDo in the graph and has no relation +to other nodes in the graph. A `DoFn` can declare multiple state variables. + +### 10.1 Types of state {#types-of-state} +Beam provides several types of state: + +#### ValueState +A ValueState is a scalar state value. For each key in the input, a ValueState will store a typed value that can be +read and modified inside the DoFn's `@ProcessElement` or `@OnTimer` methods. If the type of the ValueState has a coder +registered, then Beam will automatically infer the coder for the state value. Otherwise, a coder can be explicitly +specified when creating the ValueState. For example, the following ParDo creates a single state variable that +accumulates the number of elements seen. + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @StateId("state") private final StateSpec> numElements = StateSpecs.value(); + + @ProcessElement public void process(@StateId("state") ValueState state) { + // Read the number element seen so far for this user key. + // state.read() returns null if it was never set. The below code allows us to have a default value of 0. + int currentValue = MoreObjects.firstNonNull(state.read(), 0); + // Update the state. + state.write(currentValue + 1); + } +})); +``` + +Beam also allows explicitly specifying a coder for `ValueState` values. For example: + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @StateId("state") private final StateSpec> numElements = StateSpecs.value(new MyTypeCoder()); + ... +})); +``` + +#### CombiningState +`CombiningState` allows you to create a state object that is updated using a Beam combiner. For example, the previous +`ValueState` example could be rewritten to use `CombiningState` +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @StateId("state") private final StateSpec> numElements = + StateSpecs.combining(Sum.ofIntegers()); + + @ProcessElement public void process(@StateId("state") ValueState state) { + state.add(1); + } +})); +``` + +#### BagState +A common use case for state is to accumulate multiple elements. `BagState` allows for accumulating an unordered set +ofelements. This allows for addition of elements to the collection without requiring the reading of the entire +collection first, which is an efficiency gain. In addition, runners that support paged reads can allow individual +bags larger than available memory. + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @StateId("state") private final StateSpec> numElements = StateSpecs.bag(); + + @ProcessElement public void process( + @Element KV element, + @StateId("state") BagState state) { + // Add the current element to the bag for this key. + state.add(element.getValue()); + if (shouldFetch()) { + // Occasionally we fetch and process the values. + Iterable values = state.read(); + processValues(values); + state.clear(); // Clear the state for this key. + } + } +})); +``` +### 10.2 Deferred state reads {#deferred-state-reads} +When a `DoFn` contains multiple state specifications, reading each one in order can be slow. Calling the `read()` function +on a state can cause the runner to perform a blocking read. Performing multiple blocking reads in sequence adds latency +to element processing. If you know that a state will always be read, you can annotate it as @AlwaysFetched, and then the +runner can prefetch all of the states necessary. For example: + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @StateId("state1") private final StateSpec> state1 = StateSpecs.value(); + @StateId("state2") private final StateSpec> state2 = StateSpecs.value(); + @StateId("state3") private final StateSpec> state3 = StateSpecs.bag(); + + @ProcessElement public void process( + @AlwaysFetched @StateId("state1") ValueState state1, + @AlwaysFetched @StateId("state2") ValueState state2, + @AlwaysFetched @StateId("state3") BagState state3) { + state1.read(); + state2.read(); + state3.read(); + } +})); +``` + +If however there are code paths in which the states are not fetched, then annotating with @AlwaysFetched will add +unnecessary fetching for those paths. In this case, the readLater method allows the runner to know that the state will +be read in the future, allowing multiple state reads to be batched together. + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @StateId("state1") private final StateSpec> state1 = StateSpecs.value(); + @StateId("state2") private final StateSpec> state2 = StateSpecs.value(); + @StateId("state3") private final StateSpec> state3 = StateSpecs.bag(); + + @ProcessElement public void process( + @StateId("state1") ValueState state1, + @StateId("state2") ValueState state2, + @StateId("state3") BagState state3) { + if (/* should read state */) { + state1.readLater(); + state2.readLater(); + state3.readLater(); + } + + // The runner can now batch all three states into a single read, reducing latency. + processState1(state1.read()); + processState2(state2.read()); + processState3(state3.read()); + } +})); +``` + +### 10.3 Timers {#timers} +Beam provides a per-key timer callback API. This allows for delayed processing of data stored using the state API. +Timers can be set to callback at either an event-time or a processing-time timestamp. Every timer is identified with a +TimerId. A given timer for a key can only be set for a single timestamp. Calling set on a timer overwrites the previous +firing time for that key's timer. + +#### 10.3.1 Event-time timers {#event-time-timers} +Event-time timers fire when the input watermark for the DoFn passes the time at which the timer is set, meaning that +the runner believes that there are no more elements to be processed with timestamps before the timer timestamp. This +allows for event-time aggregations. + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @StateId("state") private final StateSpec> state = StateSpecs.value(); + @TimerId("timer") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement public void process( + @Element KV element, + @Timestamp Instant elementTs, + @StateId("state") ValueState state, + @TimerId("timer") Timer timer) { + ... + // Set an event-time timer to the element timestamp. + timer.set(elementTs); + } + + @OnTimer("timer") public void onTimer() { + //Process timer. + } +})); + +``` +#### 10.3.2 Processing-time timers {#processing-time-timers} +Processing-time timers fire when the real wall-clock time passes. This is often used to create larger batches of data +before processing. It can also be used to schedule events that should occur at a specific time. Just like with +event-time timers, processing-time timers are per key - each key has a separate copy of the timer. + +While processing-time timers can be set to an absolute timestamp, it is very common to set them to an offset relative +to the current time. The `Timer.offset` and `Timer.setRelative` methods can be used to accomplish this. + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @TimerId("timer") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement public void process(@TimerId("timer") Timer timer) { + ... + // Set a timer to go off 30 seconds in the future. + timer.offset(Duration.standardSeconds(30)).setRelative(); + } + + @OnTimer("timer") public void onTimer() { + //Process timer. + } +})); + +``` + +#### 10.3.3 Dynamic timer tags {#dynamic-timer-tags} +Beam also supports dynamically setting a timer tag using `TimerMap`. This allows for setting multiple different timers +in a `DoFn` and allowing for the timer tags to be dynamically chosen - e.g. based on data in the input elements. A +timer with a specific tag can only be set to a single timestamp, so setting the timer again has the effect of +overwriting the previous expiration time for the timer with that tag. Each `TimerMap` is identified with a timer family +id, and timers in different timer families are independent. + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @TimerFamily("actionTimers") private final TimerSpec timer = + TimerSpecs.timerMap(TimeDomain.EVENT_TIME); + + @ProcessElement public void process( + @Element KV element, + @Timestamp Instant elementTs, + @TimerFamily("actionTimers") TimerMap timers) { + timers.set(element.getValue().getActionType(), elementTs); + } + + @OnTimerFamily("actionTimers") public void onTimer(@TimerId String timerId) { + LOG.info("Timer fired with id " + timerId); + } +})); + +``` + +#### 10.3.4 Timer output timestamps {#timer-output-timestamps} +By default, event-time timers will hold the output watermark of the `ParDo` to the timestamp of the timer. This means +that if a timer is set to 12pm, any windowed aggregations or event-time timers later in the pipeline graph that finish +after 12pm will not expire. The timestamp of the timer is also the default output timestamp for the timer callback. This +means that any elements output from the onTimer method will have a timestamp equal to the timestamp of the timer firing. +For processing-time timers, the default output timestamp and watermark hold is the value of the input watermark at the +time the timer was set. + +In some cases, a DoFn needs to output timestamps earlier than the timer expiration time, and therefore also needs to +hold its output watermark to those timestamps. For example, consider the following pipeline that temporarily batches +records into state, and sets a timer to drain the state. This code may appear correct, but will not work properly. + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + @StateId("elementBag") private final StateSpec> elementBag = StateSpecs.bag(); + @StateId("timerSet") private final StateSpec> timerSet = StateSpecs.value(); + @TimerId("outputState") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement public void process( + @Element KV element, + @StateId("elementBag") BagState elementBag, + @StateId("timerSet") ValueState timerSet, + @TimerId("outputState") Timer timer) { + // Add the current element to the bag for this key. + elementBag.add(element.getValue()); + if (!MoreObjects.firstNonNull(timerSet.read(), false)) { + // If the timer is not current set, then set it to go off in a minute. + timer.offset(Duration.standardMinutes(1)).setRelative(); + timerSet.write(true); + } + } + + @OnTimer("outputState") public void onTimer( + @StateId("elementBag") BagState elementBag, + @StateId("timerSet") ValueState timerSet, + OutputReceiver output) { + for (ValueT bufferedElement : elementBag.read()) { + // Output each element. + output.outputWithTimestamp(bufferedElement, bufferedElement.timestamp()); + } + elementBag.clear(); + // Note that the timer has now fired. + timerSet.clear(); + } +})); +``` +The problem with this code is that the ParDo is buffering elements, however nothing is preventing the watermark +from advancing past the timestamp of those elements, so all those elements might be dropped as late data. In order +to prevent this from happening, an output timestamp needs to be set on the timer to prevent the watermark from advancing +past the timestamp of the minimum element. The following code demonstrates this. + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + // The bag of elements accumulated. + @StateId("elementBag") private final StateSpec> elementBag = StateSpecs.bag(); + // The timestamp of the timer set. + @StateId("timerTimestamp") private final StateSpec> timerTimestamp = StateSpecs.value(); + // The minimum timestamp stored in the bag. + @StateId("minTimestampInBag") private final StateSpec> + minTimestampInBag = StateSpecs.combining(Min.ofLongs()); + + @TimerId("outputState") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement public void process( + @Element KV element, + @StateId("elementBag") BagState elementBag, + @AlwaysFetched @StateId("timerTimestamp") ValueState timerTimestamp, + @AlwaysFetched @StateId("minTimestampInBag") CombiningState minTimestamp, + @TimerId("outputState") Timer timer) { + // Add the current element to the bag for this key. + elementBag.add(element.getValue()); + // Keep track of the minimum element timestamp currently stored in the bag. + minTimestamp.add(element.getValue().timestamp()); + + // If the timer is already set, then reset it at the same time but with an updated output timestamp (otherwise + // we would keep resetting the timer to the future). If there is no timer set, then set one to expire in a minute. + Long timerTimestampMs = timerTimestamp.read(); + Instant timerToSet = (timerTimestamp.isEmpty().read()) + ? Instant.now().plus(Duration.standardMinutes(1)) : new Instant(timerTimestampMs); + // Setting the outputTimestamp to the minimum timestamp in the bag holds the watermark to that timestamp until the + // timer fires. This allows outputting all the elements with their timestamp. + timer.withOutputTimestamp(minTimestamp.read()).set(timerToSet). + timerTimestamp.write(timerToSet.getMillis()); + } + + @OnTimer("outputState") public void onTimer( + @StateId("elementBag") BagState elementBag, + @StateId("timerTimestamp") ValueState timerTimestamp, + OutputReceiver output) { + for (ValueT bufferedElement : elementBag.read()) { + // Output each element. + output.outputWithTimestamp(bufferedElement, bufferedElement.timestamp()); + } + // Note that the timer has now fired. + timerTimestamp.clear(); + } +})); +``` +### 10.4 Garbage collecting state {#garbage-collecting-state} +Per-key state needs to be garbage collected, or eventually the increasing size of state may negatively impact +performance. There are two common strategies for garbage collecting state. + +##### 10.4.1 **Using windows for garbage collection** {#using-windows-for-garbage-collection} +All state and timers for a key is scoped to the window it is in. This means that depending on the timestamp of the +input element the ParDo will see different values for the state depending on the window that element falls into. In +addition, once the input watermark passes the end of the window, the runner should garbage collect all state for that +window. (note: if allowed lateness is set to a positive value for the window, the runner must wait for the watemark to +pass the end of the window plus the allowed lateness before garbage collecting state). This can be used as a +garbage-collection strategy. + +For example, given the following: + +```java +PCollection> perUser = readPerUser(); +perUser.apply(Window.into(CalendarWindows.days(1) + .withTimeZone(DateTimeZone.forID("America/Los_Angeles")))); + .apply(ParDo.of(new DoFn, OutputT>() { + @StateId("state") private final StateSpec> state = StateSpecs.value(); + ... + @ProcessElement public void process(@Timestamp Instant ts, @StateId("state") ValueState state) { + // The state is scoped to a calendar day window. That means that if the input timestamp ts is after + // midnight PST, then a new copy of the state will be seen for the next day. + } + })); +``` + +This `ParDo` stores state per day. Once the pipeline is done processing data for a given day, all the state for that +day is garbage collected. + +##### 10.4.1 **Using timers For garbage collection** {#using-timers-for-garbage-collection} +In some cases, it is difficult to find a windowing strategy that models the desired garbage-collection strategy. For +example, a common desire is to garbage collect state for a key once no activity has been seen on the key for some time. +This can be done by updating a timer that garbage collects state. For example + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + // The state for the key. + @StateId("state") private final StateSpec> state = StateSpecs.value(); + + // The maximum element timestamp seen so far. + @StateId("maxTimestampSeen") private final StateSpec> + maxTimestamp = StateSpecs.combining(Max.ofLongs()); + + @TimerId("gcTimer") private final TimerSpec gcTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement public void process( + @Element KV element, + @Timestamp Instant ts, + @StateId("state") ValueState state, + @StateId("maxTimestampSeen") CombiningState maxTimestamp, + @TimerId("gcTimer") gcTimer) { + updateState(state, element); + maxTimestamp.add(ts.getMillis()); + + // Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting the same timer, so + // as long as there is activity on this key the state will stay active. Once the key goes inactive for one hour's + // worth of event time (as measured by the watermark), then the gc timer will fire. + Instant expirationTime = new Instant(maxTimestamp.read()).plus(Duration.standardHours(1)); + timer.set(expirationTime); + } + + @OnTimer("gcTimer") public void onTimer( + @StateId("state") ValueState state, + @StateId("maxTimestampSeen") CombiningState maxTimestamp) { + // Clear all state for the key. + state.clear(); + maxTimestamp.clear(); + } + } +```` + +### 10.5 State and timers examples {#state-timers-examples} +Following are some example uses of state and timers + +#### 10.5.1. Joining clicks and views {#joining-clicks-and-views} +In this example, the pipeline is processing data from an e-commerce site's home page. There are two input streams: +a stream of views, representing suggested product links displayed to the user on the home page, and a stream of +clicks, representing actual user clicks on these links. The goal of the pipeline is to join click events with view +events, outputting a new joined event that contains information from both events. Each link has a unique identifier +that is present in both the view event and the join event. + +Many view events will never be followed up with clicks. This pipeline will wait one hour for a click, after which it +will give up on this join. While every click event should have a view event, some small number of view events may be +lost and never make it to the Beam pipeline; the pipeline will similarly wait one hour after seeing a click event, and +give up if the view event does not arrive in that time. Input events are not ordered - it is possible to see the click +event before the view event. The one hour join timeout should be based on event time, not on processing time. + +```java +// Read the event stream and key it by the link id. +PCollection> eventsPerLinkId = + readEvents() + .apply(WithKeys.of(Event::getLinkId).withKeyType(TypeDescriptors.strings())); + +perUser.apply(ParDo.of(new DoFn, JoinedEvent>() { + // Store the view event. + @StateId("view") private final StateSpec> viewState = StateSpecs.value(); + // Store the click event. + @StateId("click") private final StateSpec> clickState = StateSpecs.value(); + + // The maximum element timestamp seen so far. + @StateId("maxTimestampSeen") private final StateSpec> + maxTimestamp = StateSpecs.combining(Max.ofLongs()); + + // Timer that fires when an hour goes by with an incomplete join. + @TimerId("gcTimer") private final TimerSpec gcTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement public void process( + @Element KV element, + @Timestamp Instant ts, + @AlwaysFetched @StateId("view") ValueState viewState, + @AlwaysFetched @StateId("click") ValueState clickState, + @AlwaysFetched @StateId("maxTimestampSeen") CombiningState maxTimestampState, + @TimerId("gcTimer") gcTimer, + OutputReceiver output) { + // Store the event into the correct state variable. + Event event = element.getValue(); + ValueState valueState = event.getType().equals(VIEW) ? viewState : clickState; + valueState.write(event); + + Event view = viewState.read(); + Event click = clickState.read(); + (if view != null && click != null) { + // We've seen both a view and a click. Output a joined event and clear state. + output.output(JoinedEvent.of(view, click)); + clearState(viewState, clickState, maxTimestampState); + } else { + // We've only seen on half of the join. + // Set the timer to be one hour after the maximum timestamp seen. This will keep overwriting the same timer, so + // as long as there is activity on this key the state will stay active. Once the key goes inactive for one hour's + // worth of event time (as measured by the watermark), then the gc timer will fire. + maxTimestampState.add(ts.getMillis()); + Instant expirationTime = new Instant(maxTimestampState.read()).plus(Duration.standardHours(1)); + gcTimer.set(expirationTime); + } + } + + @OnTimer("gcTimer") public void onTimer( + @StateId("view") ValueState viewState, + @StateId("click") ValueState clickState, + @StateId("maxTimestampSeen") CombiningState maxTimestampState) { + // An hour has gone by with an incomplete join. Give up and clear the state. + clearState(viewState, clickState, maxTimestampState); + } + + private void clearState( + @StateId("view") ValueState viewState, + @StateId("click") ValueState clickState, + @StateId("maxTimestampSeen") CombiningState maxTimestampState) { + viewState.clear(); + clickState.clear(); + maxTimestampState.clear(); + } + })); +```` + +#### 10.5.2 Batching RPCs {#batching-rpcs} + +In this example, input elements are being forwarded to an external RPC service. The RPC accepts batch requests - +multiple events for the same user can be batched in a single RPC call. Since this RPC service also imposes rate limits, +we want to batch ten seconds worth of events together in order to reduce the number of calls. + +```java +PCollection> perUser = readPerUser(); +perUser.apply(ParDo.of(new DoFn, OutputT>() { + // Store the elements buffered so far. + @StateId("state") private final StateSpec> elements = StateSpecs.bag(); + // Keep track of whether a timer is currently set or not. + @StateId("isTimerSet") private final StateSpec> isTimerSet = StateSpecs.value(); + // The processing-time timer user to publish the RPC. + @TimerId("outputState") private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + @ProcessElement public void process( + @Element KV element, + @StateId("state") BagState elementsState, + @StateId("isTimerSet") ValueState isTimerSetState, + @TimerId("outputState") Timer timer) { + // Add the current element to the bag for this key. + state.add(element.getValue()); + if (!MoreObjects.firstNonNull(isTimerSetState.read(), false)) { + // If there is no timer currently set, then set one to go off in 10 seconds. + timer.offset(Duration.standardSeconds(10)).setRelative(); + isTimerSetState.write(true); + } + } + + @OnTimer("outputState") public void onTimer( + @StateId("state") BagState elementsState, + @StateId("isTimerSet") ValueState isTimerSetState) { + // Send an RPC containing the batched elements and clear state. + sendRPC(elementsState.read()); + elementsState.clear(); + isTimerSetState.clear(); + } +})); +``` + + +