Skip to content

Commit

Permalink
[FLINK-27168][python] Introduce ContinuousProcessingTimeTrigger and C…
Browse files Browse the repository at this point in the history
…ontinuousEventTimeTrigger

This closes apache#19421.
  • Loading branch information
cun8cun8 authored and dianfu committed Apr 14, 2022
1 parent eee8804 commit eed14ef
Showing 1 changed file with 164 additions and 1 deletion.
165 changes: 164 additions & 1 deletion flink-python/pyflink/datastream/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pyflink.common import Time, Types
from pyflink.common.constants import MAX_LONG_VALUE, MIN_LONG_VALUE
from pyflink.common.serializer import TypeSerializer
from pyflink.datastream.functions import RuntimeContext, InternalWindowFunction
from pyflink.datastream.functions import RuntimeContext, InternalWindowFunction, ReduceFunction
from pyflink.datastream.state import StateDescriptor, ReducingStateDescriptor, \
ValueStateDescriptor, State, ReducingState
from pyflink.metrics import MetricGroup
Expand All @@ -48,6 +48,8 @@
'Trigger',
'EventTimeTrigger',
'ProcessingTimeTrigger',
'ContinuousEventTimeTrigger',
'ContinuousProcessingTimeTrigger',
'CountTrigger',
'TimeWindowSerializer',
'CountWindowSerializer',
Expand Down Expand Up @@ -635,6 +637,92 @@ def clear(self,
ctx.delete_event_time_timer(window.max_timestamp())


class ContinuousEventTimeTrigger(Trigger[T, TimeWindow]):
"""
A Trigger that continuously fires based on a given time interval. This fires based Watermarks.
"""

def __init__(self, interval: int):
self.interval = interval
self.state_desc = ReducingStateDescriptor("fire-time", Min, Types.LONG())
self.fire_timestamp_state = None

@staticmethod
def of(interval: Time) -> 'ContinuousEventTimeTrigger':
return ContinuousEventTimeTrigger(interval.to_milliseconds())

def on_element(self,
element: T,
timestamp: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
if window.max_timestamp() <= ctx.get_current_watermark():
# if the watermark is already past the window fire immediately
return TriggerResult.FIRE
else:
ctx.register_event_time_timer(window.max_timestamp())

fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
if fire_timestamp_state.get() is None:
self.register_next_fire_timestamp(timestamp - (timestamp % self.interval), window, ctx,
fire_timestamp_state)

return TriggerResult.CONTINUE

def on_processing_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE

def on_event_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
if time == window.max_timestamp():
return TriggerResult.FIRE

fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
fire_timestamp = fire_timestamp_state.get()
if fire_timestamp is not None and fire_timestamp == time:
fire_timestamp_state.clear()
self.register_next_fire_timestamp(time, window, ctx, fire_timestamp_state)
return TriggerResult.FIRE

return TriggerResult.CONTINUE

def on_merge(self, window: TimeWindow, ctx: 'Trigger.OnMergeContext') -> None:
ctx.merge_partitioned_state(self.state_desc)
next_fire_timestamp = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)).get()
if next_fire_timestamp is not None:
ctx.register_event_time_timer(next_fire_timestamp)

def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None:
fire_timestamp = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
timestamp = fire_timestamp.get()
if timestamp is not None:
ctx.delete_event_time_timer(timestamp)
fire_timestamp.clear()

def can_merge(self) -> bool:
return True

def register_next_fire_timestamp(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext',
fire_timestamp_state: ReducingState):
next_fire_timestamp = min(time + self.interval, window.max_timestamp())
fire_timestamp_state.add(next_fire_timestamp)
ctx.register_event_time_timer(next_fire_timestamp)


class Min(ReduceFunction):

def reduce(self, value1, value2):
return min(value1, value2)


class ProcessingTimeTrigger(Trigger[T, TimeWindow]):
"""
A Trigger that fires once the current system time passes the end of the window to
Expand Down Expand Up @@ -677,6 +765,81 @@ def clear(self,
ctx.delete_processing_time_timer(window.max_timestamp())


class ContinuousProcessingTimeTrigger(Trigger[T, TimeWindow]):
"""
A Trigger that continuously fires based on a given time interval as measured by the clock of the
machine on which the job is running.
"""

def __init__(self, interval: int):
self.interval = interval
self.state_desc = ReducingStateDescriptor("fire-time", Min, Types.LONG())
self.fire_timestamp_state = None

@staticmethod
def of(interval: Time) -> 'ContinuousProcessingTimeTrigger':
return ContinuousProcessingTimeTrigger(interval.to_milliseconds())

def on_element(self,
element: T,
timestamp: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
timestamp = ctx.get_current_processing_time()
if fire_timestamp_state.get() is None:
self.register_next_fire_timestamp(timestamp - (timestamp % self.interval), window, ctx,
fire_timestamp_state)

return TriggerResult.CONTINUE

def on_processing_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
if fire_timestamp_state.get() == time:
fire_timestamp_state.clear()
self.register_next_fire_timestamp(time, window, ctx, fire_timestamp_state)
return TriggerResult.FIRE

return TriggerResult.CONTINUE

def on_event_time(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE

def on_merge(self, window: TimeWindow, ctx: 'Trigger.OnMergeContext') -> None:
# States for old windows will lose after the call.
ctx.merge_partitioned_state(self.state_desc)

# Register timer for this new window.
next_fire_timestamp = cast(ReducingState, ctx.get_partitioned_state(self.state_desc)).get()
if next_fire_timestamp is not None:
ctx.register_processing_time_timer(next_fire_timestamp)

def clear(self, window: TimeWindow, ctx: 'Trigger.TriggerContext') -> None:
fire_timestamp_state = cast(ReducingState, ctx.get_partitioned_state(self.state_desc))
timestamp = fire_timestamp_state.get()
if timestamp is not None:
ctx.delete_processing_time_timer(timestamp)
fire_timestamp_state.clear()

def can_merge(self) -> bool:
return True

def register_next_fire_timestamp(self,
time: int,
window: TimeWindow,
ctx: 'Trigger.TriggerContext',
fire_timestamp_state: ReducingState):
next_fire_timestamp = min(time + self.interval, window.max_timestamp())
fire_timestamp_state.add(next_fire_timestamp)
ctx.register_processing_time_timer(next_fire_timestamp)


class CountTrigger(Trigger[T, CountWindow]):
"""
A Trigger that fires once the count of elements in a pane reaches the given count.
Expand Down

0 comments on commit eed14ef

Please sign in to comment.