Skip to content

Commit

Permalink
Adding Python samples to the Timely (and stateful) Processing post. (a…
Browse files Browse the repository at this point in the history
…pache#8448)

* Adding Python snippets for Timers
  • Loading branch information
pabloem authored May 8, 2019
1 parent c26bcc1 commit 6d9214f
Showing 1 changed file with 92 additions and 10 deletions.
102 changes: 92 additions & 10 deletions website/src/_posts/2017-08-28-timely-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,13 @@ new DoFn<Event, EnrichedEvent>() {
```

```py
# State and timers are not yet supported in Beam's Python SDK.
# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
class StatefulBufferingFn(beam.DoFn):

BUFFER_STATE = BagStateSpec('buffer', EventCoder())

COUNT_STATE = CombiningValueStateSpec('count',
VarIntCoder(),
combiners.SumCombineFn())
```

Walking through the code, we have:
Expand Down Expand Up @@ -234,7 +239,7 @@ new DoFn<Event, EnrichedEvent>() {
countState.write(count);
bufferState.add(context.element());

if (count > MAX_BUFFER_SIZE) {
if (count >= MAX_BUFFER_SIZE) {
for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
context.output(enrichedEvent);
}
Expand All @@ -248,8 +253,30 @@ new DoFn<Event, EnrichedEvent>() {
```

```py
# State and timers are not yet supported in Beam's Python SDK.
# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
class StatefulBufferingFn(beam.DoFn):

MAX_BUFFER_SIZE = 500;

BUFFER_STATE = BagStateSpec('buffer', EventCoder())

COUNT_STATE = CombiningValueStateSpec('count',
VarIntCoder(),
combiners.SumCombineFn())

def process(self, element,
buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
count_state=beam.DoFn.StateParam(COUNT_STATE)):

buffer_state.add(element)

count_state.add(1)
count = count_state.read()

if count >= MAX_BUFFER_SIZE:
for event in buffer_state.read():
yield event
count_state.clear()
buffer_state.clear()
```

Here is an illustration to accompany the code:
Expand Down Expand Up @@ -320,14 +347,39 @@ new DoFn<Event, EnrichedEvent>() {
for (EnrichedEvent enrichedEvent : enrichEvents(bufferState.read())) {
context.output(enrichedEvent);
}
bufferState.clear();
}
}
}
```

```py
# State and timers are not yet supported in Beam's Python SDK.
# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
class StatefulBufferingFn(beam.DoFn):

EXPIRY_TIMER = TimerSpec('expiry', TimeDomain.WATERMARK)

def process(self, element,
w=beam.DoFn.WindowParam,
buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
count_state=beam.DoFn.StateParam(COUNT_STATE),
expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER)):

expiry_timer.set(w.end + ALLOWED_LATENESS)

… same logic as above …

@on_timer(EXPIRY_TIMER)
def expiry(self,
buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
count_state=beam.DoFn.StateParam(COUNT_STATE)):
events = buffer_state.read()

for event in events:
yield event

buffer_state.clear()
count_state.clear()
```

Let's unpack the pieces of this snippet:
Expand Down Expand Up @@ -403,7 +455,7 @@ new DoFn<Event, EnrichedEvent>() {

boolean staleTimerSet = firstNonNull(staleSetState.read(), false);
if (firstNonNull(countState.read(), 0) == 0) {
staleTimer.offset(MAX_BUFFER_DURATION).setRelative());
staleTimer.offset(MAX_BUFFER_DURATION).setRelative();
}

… same processing logic as above …
Expand All @@ -428,8 +480,38 @@ new DoFn<Event, EnrichedEvent>() {
```

```py
# State and timers are not yet supported in Beam's Python SDK.
# Follow https://issues.apache.org/jira/browse/BEAM-2687 for updates.
class StatefulBufferingFn(beam.DoFn):

STALE_TIMER = TimerSpec('stale', TimeDomain.REAL_TIME)

MAX_BUFFER_DURATION = 1

def process(self, element,
w=beam.DoFn.WindowParam,
buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
count_state=beam.DoFn.StateParam(COUNT_STATE),
expiry_timer=beam.DoFn.TimerParam(EXPIRY_TIMER),
stale_timer=beam.DoFn.TimerParam(STALE_TIMER)):

if count_state.read() == 0:
# We set an absolute timestamp here (not an offset like in the Java SDK)
stale_timer.set(time.time() + StatefulBufferingFn.MAX_BUFFER_DURATION)

… same logic as above …

@on_timer(STALE_TIMER)
def stale(self,
buffer_state=beam.DoFn.StateParam(BUFFER_STATE),
count_state=beam.DoFn.StateParam(COUNT_STATE)):
events = buffer_state.read()

for event in events:
yield event

buffer_state.clear()
count_state.clear()

```

Here is an illustration of the final code:
Expand Down

0 comments on commit 6d9214f

Please sign in to comment.