Skip to content

Commit

Permalink
[FLINK-27213][python] Introduce PurgingTrigger
Browse files Browse the repository at this point in the history
This closes apache#19480.
  • Loading branch information
cun8cun8 authored and dianfu committed Apr 19, 2022
1 parent a3a6bd9 commit 45f11c7
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 2 deletions.
23 changes: 21 additions & 2 deletions flink-python/pyflink/datastream/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from pyflink.datastream.window import (TumblingEventTimeWindows,
SlidingEventTimeWindows, EventTimeSessionWindows,
CountSlidingWindowAssigner, SessionWindowTimeGapExtractor,
CountWindow)
CountWindow, PurgingTrigger, EventTimeTrigger)
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase

Expand Down Expand Up @@ -346,6 +346,25 @@ def test_session_window_late_merge(self):
expected = ['(hi,3)']
self.assert_equals_sorted(expected, results)

def test_event_time_session_window_with_purging_trigger(self):
data_stream = self.env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())

data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(3))) \
.trigger(PurgingTrigger.of(EventTimeTrigger.create())) \
.process(CountWindowProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)

self.env.execute('test_event_time_session_window_with_purging_trigger')
results = self.test_sink.get_results()
expected = ['(hi,1)', '(hi,2)', '(hi,4)']
self.assert_equals_sorted(expected, results)


class SecondColumnTimestampAssigner(TimestampAssigner):

Expand All @@ -372,7 +391,7 @@ class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, CountW

def process(self,
key: str,
context: ProcessWindowFunction.Context,
context: ProcessWindowFunction.Context[CountWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, len([e for e in elements]))]

Expand Down
71 changes: 71 additions & 0 deletions flink-python/pyflink/datastream/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
'ProcessingTimeTrigger',
'ContinuousEventTimeTrigger',
'ContinuousProcessingTimeTrigger',
'PurgingTrigger',
'CountTrigger',
'TimeWindowSerializer',
'CountWindowSerializer',
Expand Down Expand Up @@ -636,6 +637,10 @@ def clear(self,
ctx: 'Trigger.TriggerContext') -> None:
ctx.delete_event_time_timer(window.max_timestamp())

@staticmethod
def create() -> 'EventTimeTrigger':
return EventTimeTrigger()


class ContinuousEventTimeTrigger(Trigger[T, TimeWindow]):
"""
Expand Down Expand Up @@ -764,6 +769,10 @@ def clear(self,
ctx: 'Trigger.TriggerContext') -> None:
ctx.delete_processing_time_timer(window.max_timestamp())

@staticmethod
def create() -> 'ProcessingTimeTrigger':
return ProcessingTimeTrigger()


class ContinuousProcessingTimeTrigger(Trigger[T, TimeWindow]):
"""
Expand Down Expand Up @@ -840,6 +849,64 @@ def register_next_fire_timestamp(self,
ctx.register_processing_time_timer(next_fire_timestamp)


class PurgingTrigger(Trigger[T, Window]):
"""
A trigger that can turn any Trigger into a purging Trigger.
When the nested trigger fires, this will return a FIRE_AND_PURGE TriggerResult.
"""

def __init__(self, nested_trigger: Trigger[T, Window]):
self.nested_trigger = nested_trigger

@staticmethod
def of(nested_trigger: Trigger[T, Window]) -> 'PurgingTrigger':
return PurgingTrigger(nested_trigger)

def on_element(self,
element: T,
timestamp: int,
window: Window,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
trigger_result = self.nested_trigger.on_element(element, timestamp, window, ctx)
if trigger_result.is_fire() is True:
return TriggerResult.FIRE_AND_PURGE
else:
return trigger_result

def on_event_time(self,
time: int,
window: Window,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
trigger_result = self.nested_trigger.on_event_time(time, window, ctx)
if trigger_result.is_fire() is True:
return TriggerResult.FIRE_AND_PURGE
else:
return trigger_result

def on_processing_time(self,
time: int,
window: Window,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
trigger_result = self.nested_trigger.on_processing_time(time, window, ctx)
if trigger_result.is_fire() is True:
return TriggerResult.FIRE_AND_PURGE
else:
return trigger_result

def clear(self,
window: Window,
ctx: 'Trigger.TriggerContext') -> None:
self.nested_trigger.clear(window, ctx)

def can_merge(self) -> bool:
return self.nested_trigger.can_merge()

def on_merge(self,
window: Window,
ctx: 'Trigger.OnMergeContext') -> None:
self.nested_trigger.on_merge(window, ctx)


class CountTrigger(Trigger[T, CountWindow]):
"""
A Trigger that fires once the count of elements in a pane reaches the given count.
Expand All @@ -850,6 +917,10 @@ def __init__(self, window_size: int):
self._count_state_descriptor = ReducingStateDescriptor(
"count", lambda a, b: a + b, Types.LONG())

@staticmethod
def of(window_size: int) -> 'CountTrigger':
return CountTrigger(window_size)

def on_element(self,
element: T,
timestamp: int,
Expand Down

0 comments on commit 45f11c7

Please sign in to comment.