Skip to content

Commit

Permalink
[FLINK-27271][python] Introduce Globalwindows window allocator
Browse files Browse the repository at this point in the history
This closes apache#19696.
  • Loading branch information
cun8cun8 authored and dianfu committed May 13, 2022
1 parent 13ad359 commit ce6ee9c
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 4 deletions.
10 changes: 10 additions & 0 deletions docs/content.zh/docs/dev/datastream/operators/windows.md
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,16 @@ input
.<windowed transformation>(<window function>)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
input = ... # type: DataStream

input \
.key_by(<key selector>) \
.window(GlobalWindows.create()) \
.<windowed transformation>(<window function>)
```
{{< /tab >}}
{{< /tabs >}}

## 窗口函数(Window Functions)
Expand Down
10 changes: 10 additions & 0 deletions docs/content/docs/dev/datastream/operators/windows.md
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,16 @@ input
.<windowed transformation>(<window function>)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
input = ... # type: DataStream

input \
.key_by(<key selector>) \
.window(GlobalWindows.create()) \
.<windowed transformation>(<window function>)
```
{{< /tab >}}
{{< /tabs >}}

## Window Functions
Expand Down
7 changes: 6 additions & 1 deletion flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
from pyflink.datastream.utils import convert_to_python_obj
from pyflink.datastream.window import (CountTumblingWindowAssigner, CountSlidingWindowAssigner,
CountWindowSerializer, TimeWindowSerializer, Trigger,
WindowAssigner, WindowOperationDescriptor)
WindowAssigner, WindowOperationDescriptor,
GlobalWindowSerializer)
from pyflink.java_gateway import get_gateway

__all__ = ['CloseableIterator', 'DataStream', 'KeyedStream', 'ConnectedStreams', 'WindowedStream',
Expand Down Expand Up @@ -2138,6 +2139,10 @@ def _get_one_input_stream_operator(data_stream: DataStream,
elif isinstance(window_serializer, CountWindowSerializer):
j_namespace_serializer = \
gateway.jvm.org.apache.flink.table.runtime.operators.window.CountWindow.Serializer()
elif isinstance(window_serializer, GlobalWindowSerializer):
j_namespace_serializer = \
gateway.jvm.org.apache.flink.streaming.api.windowing.windows.GlobalWindow \
.Serializer()
else:
j_namespace_serializer = \
gateway.jvm.org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer()
Expand Down
33 changes: 32 additions & 1 deletion flink-python/pyflink/datastream/tests/test_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
from pyflink.datastream.window import (TumblingEventTimeWindows,
SlidingEventTimeWindows, EventTimeSessionWindows,
CountSlidingWindowAssigner, SessionWindowTimeGapExtractor,
CountWindow, PurgingTrigger, EventTimeTrigger, TimeWindow)
CountWindow, PurgingTrigger, EventTimeTrigger, TimeWindow,
GlobalWindows, CountTrigger)
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
Expand Down Expand Up @@ -410,6 +411,36 @@ def test_side_output_late_data(self):
side_expected = ['+I[a, 4]']
self.assert_equals_sorted(side_expected, side_sink.get_results())

def test_global_window_with_purging_trigger(self):
self.env.set_parallelism(1)
data_stream = self.env.from_collection([
('hi', 1), ('hi', 1), ('hi', 1), ('hi', 1), ('hi', 1), ('hi', 1), ('hi', 1)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()])) # type: DataStream

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())

class MyProcessFunction(ProcessWindowFunction):

def clear(self, context: ProcessWindowFunction.Context) -> None:
pass

def process(self, key, context: ProcessWindowFunction.Context,
elements: Iterable[Tuple[str, int]]) -> Iterable[tuple]:
return [(key, len([e for e in elements]))]

data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(GlobalWindows.create()) \
.trigger(PurgingTrigger.of(CountTrigger.of(2))) \
.process(MyProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)

self.env.execute('test_global_window_with_purging_trigger')
results = self.test_sink.get_results()
expected = ['(hi,2)', '(hi,2)', '(hi,2)']
self.assert_equals_sorted(expected, results)


class SecondColumnTimestampAssigner(TimestampAssigner):

Expand Down
124 changes: 123 additions & 1 deletion flink-python/pyflink/datastream/window.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
__all__ = ['Window',
'TimeWindow',
'CountWindow',
'GlobalWindow',
'TumblingProcessingTimeWindows',
'TumblingEventTimeWindows',
'SlidingProcessingTimeWindows',
Expand All @@ -51,11 +52,13 @@
'ProcessingTimeTrigger',
'ContinuousEventTimeTrigger',
'ContinuousProcessingTimeTrigger',
'NeverTrigger',
'PurgingTrigger',
'CountTrigger',
'TimeWindowSerializer',
'CountWindowSerializer',
'SessionWindowTimeGapExtractor']
'SessionWindowTimeGapExtractor',
'GlobalWindows']


def long_to_int_with_bit_mixing(x: int) -> int:
Expand Down Expand Up @@ -197,6 +200,30 @@ def __repr__(self):
return "CountWindow(id={})".format(self.id)


class GlobalWindow(Window):
"""
The default window into which all data is placed GlobalWindows.
"""
def __init__(self):
super(GlobalWindow, self).__init__()

@staticmethod
def get() -> 'GlobalWindow':
return GlobalWindow()

def max_timestamp(self) -> int:
return MAX_LONG_VALUE

def __eq__(self, other):
return self.__class__ == other.__class__

def __hash__(self):
return 0

def __repr__(self):
return "GlobalWindow"


class TimeWindowSerializer(TypeSerializer[TimeWindow]):
"""
The serializer used to write the TimeWindow type.
Expand Down Expand Up @@ -244,6 +271,31 @@ def _get_coder(self):
return coders.CountWindowCoder()


class GlobalWindowSerializer(TypeSerializer[GlobalWindow]):
"""
A TypeSerializer for GlobalWindow.
"""

def __init__(self):
self._underlying_coder = None

def serialize(self, element: GlobalWindow, stream: BytesIO) -> None:
if self._underlying_coder is None:
self._underlying_coder = self._get_coder().get_impl()
bytes_data = self._underlying_coder.encode(element)
stream.write(bytes_data)

def deserialize(self, stream: BytesIO) -> GlobalWindow:
if self._underlying_coder is None:
self._underlying_coder = self._get_coder().get_impl()
bytes_data = stream.read(8)
return self._underlying_coder.decode(bytes_data)

def _get_coder(self):
from pyflink.fn_execution import coders
return coders.GlobalWindowCoder()


T = TypeVar('T')
W = TypeVar('W')
W2 = TypeVar('W2')
Expand Down Expand Up @@ -965,6 +1017,41 @@ def clear(self, window: CountWindow, ctx: Trigger.TriggerContext) -> None:
count_state.clear()


class NeverTrigger(Trigger[T, GlobalWindow]):
"""
A trigger that never fires, as default Trigger for GlobalWindows.
"""

def on_element(self,
element: T,
timestamp: int,
window: GlobalWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE

def on_processing_time(self,
time: int,
window: GlobalWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE

def on_event_time(self,
time: int,
window: GlobalWindow,
ctx: 'Trigger.TriggerContext') -> TriggerResult:
return TriggerResult.CONTINUE

def on_merge(self,
window: GlobalWindow,
ctx: 'Trigger.OnMergeContext') -> None:
pass

def clear(self,
window: GlobalWindow,
ctx: 'Trigger.TriggerContext') -> None:
pass


class CountTumblingWindowAssigner(WindowAssigner[T, CountWindow]):
"""
A WindowAssigner that windows elements into fixed-size windows based on the count number
Expand Down Expand Up @@ -1589,3 +1676,38 @@ def is_event_time(self) -> bool:

def __repr__(self):
return "DynamicEventTimeSessionWindows(%s)" % self._session_gap


class GlobalWindows(WindowAssigner[T, GlobalWindow]):
"""
A WindowAssigner that assigns all elements to the same GlobalWindow.
"""

def __init__(self) -> None:
super().__init__()

def assign_windows(self,
element: T,
timestamp: int,
context: 'WindowAssigner.WindowAssignerContext') -> Collection[GlobalWindow]:
return [GlobalWindow.get()]

@staticmethod
def create() -> 'GlobalWindows':
"""
Creates a new GlobalWindows WindowAssigner that assigns all elements to the
same GlobalWindow.
"""
return GlobalWindows()

def get_default_trigger(self, env) -> Trigger[T, GlobalWindow]:
return NeverTrigger()

def get_window_serializer(self) -> TypeSerializer[GlobalWindow]:
return GlobalWindowSerializer()

def is_event_time(self) -> bool:
return False

def __repr__(self) -> str:
return "GlobalWindows()"
14 changes: 13 additions & 1 deletion flink-python/pyflink/fn_execution/coder_impl_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import cloudpickle

from pyflink.common import Row, RowKind
from pyflink.common.time import Instant
from pyflink.datastream.window import CountWindow, TimeWindow
from pyflink.datastream.window import CountWindow, TimeWindow, GlobalWindow
from pyflink.fn_execution.ResettableIO import ResettableIO
from pyflink.table.utils import pandas_to_arrow, arrow_to_pandas

Expand Down Expand Up @@ -909,6 +909,18 @@ cdef class CountWindowCoderImpl(FieldCoderImpl):
cpdef decode_from_stream(self, InputStream in_stream, size_t size):
return CountWindow(in_stream.read_int64())

cdef class GlobalWindowCoderImpl(FieldCoderImpl):
"""
A coder for GlobalWindow.
"""

cpdef encode_to_stream(self, value, OutputStream out_stream):
out_stream.write_byte(0)

cpdef decode_from_stream(self, InputStream in_stream, size_t size):
in_stream.read_byte()
return GlobalWindow()

cdef class DataViewFilterCoderImpl(FieldCoderImpl):
"""
A coder for CountWindow.
Expand Down
13 changes: 13 additions & 0 deletions flink-python/pyflink/fn_execution/coder_impl_slow.py
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,19 @@ def decode_from_stream(self, in_stream: InputStream, length=0):
return CountWindow(in_stream.read_int64())


class GlobalWindowCoderImpl(FieldCoderImpl):
"""
A coder for CountWindow.
"""

def encode_to_stream(self, value, out_stream: OutputStream):
out_stream.write_byte(0)

def decode_from_stream(self, in_stream: InputStream, length=0):
in_stream.read_byte()
return GlobalWindowCoderImpl()


class DataViewFilterCoderImpl(FieldCoderImpl):
"""
A coder for data view filter.
Expand Down
9 changes: 9 additions & 0 deletions flink-python/pyflink/fn_execution/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,15 @@ def get_impl(self):
return coder_impl.CountWindowCoderImpl()


class GlobalWindowCoder(FieldCoder):
"""
Coder for GlobalWindow.
"""

def get_impl(self):
return coder_impl.GlobalWindowCoderImpl()


class DataViewFilterCoder(FieldCoder):
"""
Coder for data view filter.
Expand Down

0 comments on commit ce6ee9c

Please sign in to comment.