Skip to content

Commit

Permalink
[FLINK-21842][python] Support user defined WindowAssigner, Trigger an…
Browse files Browse the repository at this point in the history
…d ProcessWindowFunction on Python DataStream API

This closes apache#15416
  • Loading branch information
WeiZhong94 committed Mar 31, 2021
1 parent 5dccd76 commit 62838c9
Show file tree
Hide file tree
Showing 61 changed files with 3,250 additions and 1,003 deletions.
86 changes: 86 additions & 0 deletions flink-python/pyflink/common/serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from abc import abstractmethod, ABC
from io import BytesIO
from typing import TypeVar, Generic

T = TypeVar('T')


class TypeSerializer(ABC, Generic[T]):
"""
This interface describes the methods that are required for a data type to be handled by the
Flink runtime. Specifically, this interface contains the serialization and deserialization
methods.
"""

def __eq__(self, other):
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__

def __ne__(self, other):
return not self.__eq__(other)

def __repr__(self):
return "%s()" % self.__class__.__name__

def __hash__(self):
return hash(str(self))

@abstractmethod
def serialize(self, element: T, stream: BytesIO) -> None:
"""
Serializes an element to the output stream.
"""
pass

@abstractmethod
def deserialize(self, stream: BytesIO) -> T:
"""
Returns a deserialized element from the input stream.
"""
pass

def _get_coder(self):
serialize_func = self.serialize
deserialize_func = self.deserialize

class CoderAdapter(object):

def encode_nested(self, element):
bytes_io = BytesIO()
serialize_func(element, bytes_io)
return bytes_io.getvalue()

def decode_nested(self, bytes_data):
bytes_io = BytesIO(bytes_data)
return deserialize_func(bytes_io)

return CoderAdapter()


void = b''


class VoidNamespaceSerializer(TypeSerializer[bytes]):

def serialize(self, element: bytes, stream: BytesIO) -> None:
pass

def deserialize(self, stream: BytesIO) -> bytes:
return void
11 changes: 10 additions & 1 deletion flink-python/pyflink/datastream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
from pyflink.datastream.time_domain import TimeDomain
from pyflink.datastream.functions import ProcessFunction
from pyflink.datastream.timerservice import TimerService
from pyflink.datastream.window import Window, TimeWindow, CountWindow, WindowAssigner, \
MergingWindowAssigner, TriggerResult, Trigger

__all__ = [
'StreamExecutionEnvironment',
Expand Down Expand Up @@ -108,5 +110,12 @@
'TimeCharacteristic',
'TimeDomain',
'ProcessFunction',
'TimerService'
'TimerService',
'Window',
'TimeWindow',
'CountWindow',
'WindowAssigner',
'MergingWindowAssigner',
'TriggerResult',
'Trigger'
]
157 changes: 153 additions & 4 deletions flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import typing
import uuid
import warnings
from typing import Callable, Union, List, cast

from pyflink.common import typeinfo, ExecutionConfig, Row
from pyflink.datastream.window import TimeWindowSerializer, CountWindowSerializer, WindowAssigner, \
Trigger, WindowOperationDescriptor
from pyflink.common.typeinfo import RowTypeInfo, Types, TypeInformation, _from_java_type
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.functions import _get_python_env, FlatMapFunctionWrapper, FlatMapFunction, \
MapFunction, MapFunctionWrapper, Function, FunctionWrapper, SinkFunction, FilterFunction, \
FilterFunctionWrapper, KeySelectorFunctionWrapper, KeySelector, ReduceFunction, \
ReduceFunctionWrapper, CoMapFunction, CoFlatMapFunction, Partitioner, \
PartitionerFunctionWrapper, RuntimeContext, ProcessFunction, KeyedProcessFunction, \
KeyedCoProcessFunction
from pyflink.datastream.state import ValueStateDescriptor, ValueState
KeyedCoProcessFunction, WindowFunction, ProcessWindowFunction, InternalWindowFunction, \
InternalIterableWindowFunction, InternalIterableProcessWindowFunction
from pyflink.datastream.state import ValueStateDescriptor, ValueState, ListStateDescriptor
from pyflink.datastream.utils import convert_to_python_obj
from pyflink.java_gateway import get_gateway

Expand Down Expand Up @@ -890,7 +894,7 @@ def open(self, runtime_context: RuntimeContext):
self._reduce_value_state = runtime_context.get_state(
ValueStateDescriptor("_reduce_state" + str(uuid.uuid4()), output_type))
self._reduce_function.open(runtime_context)
from pyflink.fn_execution.operations import StreamingRuntimeContext
from pyflink.fn_execution.datastream.runtime_context import StreamingRuntimeContext
self._in_batch_execution_mode = \
cast(StreamingRuntimeContext, runtime_context)._in_batch_execution_mode

Expand Down Expand Up @@ -981,6 +985,20 @@ def process(self, func: KeyedProcessFunction, # type: ignore
j_output_type_info,
j_python_data_stream_function_operator))

def window(self, window_assigner: WindowAssigner) -> 'WindowedStream':
"""
Windows this data stream to a WindowedStream, which evaluates windows over a key
grouped stream. Elements are put into windows by a WindowAssigner. The grouping of
elements is done both by key and by window.
A Trigger can be defined to specify when windows are evaluated. However, WindowAssigners
have a default Trigger that is used if a Trigger is not specified.
:param window_assigner: The WindowAssigner that assigns elements to windows.
:return: The trigger windows data stream.
"""
return WindowedStream(self, window_assigner)

def union(self, *streams) -> 'DataStream':
return self._values().union(*streams)

Expand Down Expand Up @@ -1053,6 +1071,117 @@ def slot_sharing_group(self, slot_sharing_group: str) -> 'DataStream':
raise Exception("Setting slot sharing group for KeyedStream is not supported.")


class WindowedStream(object):
"""
A WindowedStream represents a data stream where elements are grouped by key, and for each
key, the stream of elements is split into windows based on a WindowAssigner. Window emission
is triggered based on a Trigger.
The windows are conceptually evaluated for each key individually, meaning windows can trigger
at different points for each key.
Note that the WindowedStream is purely an API construct, during runtime the WindowedStream will
be collapsed together with the KeyedStream and the operation over the window into one single
operation.
"""

def __init__(self, keyed_stream: KeyedStream, window_assigner: WindowAssigner):
self._keyed_stream = keyed_stream
self._window_assigner = window_assigner
self._allowed_lateness = 0
self._window_trigger = None # type: Trigger

def get_execution_environment(self):
return self._keyed_stream.get_execution_environment()

def get_input_type(self):
return self._keyed_stream.get_type()

def trigger(self, trigger: Trigger):
"""
Sets the Trigger that should be used to trigger window emission.
"""
self._window_trigger = trigger
return self

def allowed_lateness(self, time_ms: int):
"""
Sets the time by which elements are allowed to be late. Elements that arrive behind the
watermark by more than the specified time will be dropped. By default, the allowed lateness
is 0.
Setting an allowed lateness is only valid for event-time windows.
"""
self._allowed_lateness = time_ms
return self

def apply(self,
window_function: WindowFunction, result_type: TypeInformation = None) -> DataStream:
"""
Applies the given window function to each window. The window function is called for each
evaluation of the window for each key individually. The output of the window function is
interpreted as a regular non-windowed stream.
Note that this function requires that all data in the windows is buffered until the window
is evaluated, as the function provides no means of incremental aggregation.
:param window_function: The window function.
:param result_type: Type information for the result type of the window function.
:return: The data stream that is the result of applying the window function to the window.
"""
internal_window_function = InternalIterableWindowFunction(
window_function) # type: InternalWindowFunction
return self._get_result_data_stream(internal_window_function, result_type)

def process(self,
process_window_function: ProcessWindowFunction,
result_type: TypeInformation = None):
"""
Applies the given window function to each window. The window function is called for each
evaluation of the window for each key individually. The output of the window function is
interpreted as a regular non-windowed stream.
Note that this function requires that all data in the windows is buffered until the window
is evaluated, as the function provides no means of incremental aggregation.
:param process_window_function: The window function.
:param result_type: Type information for the result type of the window function.
:return: The data stream that is the result of applying the window function to the window.
"""
internal_window_function = InternalIterableProcessWindowFunction(
process_window_function) # type: InternalWindowFunction
return self._get_result_data_stream(internal_window_function, result_type)

def _get_result_data_stream(
self, internal_window_function: InternalWindowFunction, result_type):
if self._window_trigger is None:
self._window_trigger = self._window_assigner.get_default_trigger(
self.get_execution_environment())
window_serializer = self._window_assigner.get_window_serializer()
window_state_descriptor = ListStateDescriptor(
"window-contents", self.get_input_type())
window_operation_descriptor = WindowOperationDescriptor(
self._window_assigner,
self._window_trigger,
self._allowed_lateness,
window_state_descriptor,
window_serializer,
internal_window_function)

from pyflink.fn_execution import flink_fn_execution_pb2
j_python_data_stream_function_operator, j_output_type_info = \
_get_one_input_stream_operator(
self._keyed_stream,
window_operation_descriptor,
flink_fn_execution_pb2.UserDefinedDataStreamFunction.WINDOW, # type: ignore
result_type)

return DataStream(self._keyed_stream._j_data_stream.transform(
"WINDOW",
j_output_type_info,
j_python_data_stream_function_operator))


class ConnectedStreams(object):
"""
ConnectedStreams represent two connected streams of (possibly) different data types.
Expand Down Expand Up @@ -1221,7 +1350,9 @@ def _is_keyed_stream(self):


def _get_one_input_stream_operator(data_stream: DataStream,
func: Union[Function, FunctionWrapper],
func: Union[Function,
FunctionWrapper,
WindowOperationDescriptor],
func_type: int,
output_type: Union[TypeInformation, List] = None):
"""
Expand Down Expand Up @@ -1267,6 +1398,24 @@ def _get_one_input_stream_operator(data_stream: DataStream,
JDataStreamPythonFunctionOperator = gateway.jvm.PythonProcessOperator
elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS: # type: ignore
JDataStreamPythonFunctionOperator = gateway.jvm.PythonKeyedProcessOperator
elif func_type == UserDefinedDataStreamFunction.WINDOW: # type: ignore
window_serializer = typing.cast(WindowOperationDescriptor, func).window_serializer
if isinstance(window_serializer, TimeWindowSerializer):
j_namespace_serializer = \
gateway.jvm.org.apache.flink.table.runtime.operators.window.TimeWindow.Serializer()
elif isinstance(window_serializer, CountWindowSerializer):
j_namespace_serializer = \
gateway.jvm.org.apache.flink.table.runtime.operators.window.CountWindow.Serializer()
else:
j_namespace_serializer = \
gateway.jvm.org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer()
j_python_function_operator = gateway.jvm.PythonKeyedProcessOperator(
j_conf,
j_input_types,
j_output_type_info,
j_data_stream_python_function_info,
j_namespace_serializer)
return j_python_function_operator, j_output_type_info
else:
raise TypeError("Unsupported function type: %s" % func_type)

Expand Down
Loading

0 comments on commit 62838c9

Please sign in to comment.