Skip to content

Commit

Permalink
[FLINK-20647][python] Use yield to generate output data in ProcessFun…
Browse files Browse the repository at this point in the history
…ction of Python DataStream API

This closes apache#14414.
  • Loading branch information
shuiqiangchen authored and dianfu committed Dec 18, 2020
1 parent 9b5d50c commit 1317cab
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream.functions import Collector, KeyedProcessFunction
from pyflink.datastream.functions import KeyedProcessFunction

from functions import MyKeySelector

Expand Down Expand Up @@ -59,15 +59,15 @@ def python_data_stream_example():

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
result = "Current key: {}, orderId: {}, payAmount: {}, timestamp: {}".format(
str(ctx.get_current_key()), str(value[1]), str(value[2]), str(ctx.timestamp()))
out.collect(result)
yield result
current_watermark = ctx.timer_service().current_watermark()
ctx.timer_service().register_event_time_timer(current_watermark + 1500)

def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
out.collect("On timer timestamp: " + str(timestamp))
def on_timer(self, timestamp, ctx: 'KeyedProcessFunction.OnTimerContext'):
yield "On timer timestamp: " + str(timestamp)


class KafkaRowTimestampAssigner(TimestampAssigner):
Expand Down
3 changes: 1 addition & 2 deletions flink-python/pyflink/datastream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.datastream.time_characteristic import TimeCharacteristic
from pyflink.datastream.time_domain import TimeDomain
from pyflink.datastream.functions import ProcessFunction, Collector, TimerService
from pyflink.datastream.functions import ProcessFunction, TimerService

__all__ = [
'StreamExecutionEnvironment',
Expand Down Expand Up @@ -107,6 +107,5 @@
'TimeCharacteristic',
'TimeDomain',
'ProcessFunction',
'Collector',
'TimerService'
]
24 changes: 3 additions & 21 deletions flink-python/pyflink/datastream/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
'SourceFunction',
'SinkFunction',
'ProcessFunction',
'Collector',
'KeyedProcessFunction']


Expand Down Expand Up @@ -560,20 +559,6 @@ def __init__(self, sink_func: Union[str, JavaObject]):
super(SinkFunction, self).__init__(sink_func)


class Collector(abc.ABC):
"""
Collects a record and forwards it.
"""
@abc.abstractmethod
def collect(self, value):
"""
Emits a record.
:param value: The record to collect.
"""
pass


class TimerService(abc.ABC):
"""
Interface for working with time and timers.
Expand Down Expand Up @@ -681,7 +666,7 @@ def timestamp(self) -> int:
pass

@abc.abstractmethod
def process_element(self, value, ctx: 'ProcessFunction.Context', out: Collector):
def process_element(self, value, ctx: 'ProcessFunction.Context'):
"""
Process one element from the input stream.
Expand All @@ -692,7 +677,6 @@ def process_element(self, value, ctx: 'ProcessFunction.Context', out: Collector)
:param ctx: A Context that allows querying the timestamp of the element and getting a
TimerService for registering timers and querying the time. The context is only
valid during the invocation of this method, do not store it.
:param out: The collector for returning result values.
"""
pass

Expand Down Expand Up @@ -756,7 +740,7 @@ def time_domain(self) -> TimeDomain:
pass

@abc.abstractmethod
def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Collector):
def process_element(self, value, ctx: 'KeyedProcessFunction.Context'):
"""
Process one element from the input stream.
Expand All @@ -767,11 +751,10 @@ def process_element(self, value, ctx: 'KeyedProcessFunction.Context', out: Colle
:param ctx: A Context that allows querying the timestamp of the element and getting a
TimerService for registering timers and querying the time. The context is only
valid during the invocation of this method, do not store it.
:param out: The collector for returning result values.
"""
pass

def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext', out: Collector):
def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'):
"""
Called when a timer set using TimerService fires.
Expand All @@ -780,6 +763,5 @@ def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext', o
querying the TimeDomain of the firing timer and getting a TimerService for
registering timers and querying the time. The context is only valid during the
invocation of this method, do not store it.
:param out: The collector for returning result values.
"""
pass
16 changes: 8 additions & 8 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,15 +666,15 @@ def extract_timestamp(self, value, record_timestamp) -> int:

class MyProcessFunction(KeyedProcessFunction):

def process_element(self, value, ctx, out):
def process_element(self, value, ctx):
current_timestamp = ctx.timestamp()
current_watermark = ctx.timer_service().current_watermark()
current_key = ctx.get_current_key()
out.collect("current key: {}, current timestamp: {}, current watermark: {}, "
"current_value: {}".format(str(current_key), str(current_timestamp),
str(current_watermark), str(value)))
yield "current key: {}, current timestamp: {}, current watermark: {}, " \
"current_value: {}".format(str(current_key), str(current_timestamp),
str(current_watermark), str(value))

def on_timer(self, timestamp, ctx, out):
def on_timer(self, timestamp, ctx):
pass

watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()\
Expand Down Expand Up @@ -713,11 +713,11 @@ def extract_timestamp(self, value, record_timestamp) -> int:

class MyProcessFunction(ProcessFunction):

def process_element(self, value, ctx, out):
def process_element(self, value, ctx):
current_timestamp = ctx.timestamp()
current_watermark = ctx.timer_service().current_watermark()
out.collect("current timestamp: {}, current watermark: {}, current_value: {}"
.format(str(current_timestamp), str(current_watermark), str(value)))
yield "current timestamp: {}, current watermark: {}, current_value: {}"\
.format(str(current_timestamp), str(current_watermark), str(value))

def on_timer(self, timestamp, ctx, out):
pass
Expand Down
23 changes: 11 additions & 12 deletions flink-python/pyflink/fn_execution/operation_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,19 +262,16 @@ def wrapped_func(value):
return func, user_defined_func


def extract_process_function(user_defined_function_proto, ctx, collector):
def extract_process_function(user_defined_function_proto, ctx):
process_function = pickle.loads(user_defined_function_proto.payload)
process_element = process_function.process_element

def wrapped_process_function(value):
# VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK, NORMAL_DATA]
ctx.set_timestamp(value[0])
ctx.timer_service().set_current_watermark(value[1])
process_element(value[2], ctx, collector)

for a in collector.buf:
yield a[1]
collector.clear()
output_result = process_element(value[2], ctx)
return output_result

return wrapped_process_function, process_function

Expand All @@ -300,7 +297,7 @@ def wrapped_keyed_process_function(value):
on_timer_ctx.set_time_domain(TimeDomain.PROCESSING_TIME)
else:
raise TypeError("TimeCharacteristic[%s] is not supported." % str(value[0]))
on_timer(value[1], on_timer_ctx, collector)
output_result = on_timer(value[1], on_timer_ctx)
else:
# it is normal data
# VALUE: TIMER_FLAG, CURRENT_TIMESTAMP, CURRENT_WATERMARK, None, NORMAL_DATA
Expand All @@ -311,17 +308,19 @@ def wrapped_keyed_process_function(value):
ctx.set_current_key(current_key)
keyed_state_backend.set_current_key(Row(current_key))

process_element(value[4][1], ctx, collector)
output_result = process_element(value[4][1], ctx)

if output_result:
for result in output_result:
yield Row(None, None, None, result)

for result in collector.buf:
# 0: proc time timer data
# 1: event time timer data
# 2: normal data
# result_row: [TIMER_FLAG, TIMER TYPE, TIMER_KEY, RESULT_DATA]
if result[0] == KeyedProcessFunctionOutputFlag.NORMAL_DATA.value:
yield Row(None, None, None, result[1])
else:
yield Row(result[0], result[1], result[2], None)
yield Row(result[0], result[1], result[2], None)

collector.clear()

return wrapped_keyed_process_function, process_function
Expand Down
26 changes: 3 additions & 23 deletions flink-python/pyflink/fn_execution/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from typing import Tuple, Any

from pyflink.datastream import TimeDomain
from pyflink.datastream.functions import RuntimeContext, TimerService, Collector, ProcessFunction, \
from pyflink.datastream.functions import RuntimeContext, TimerService, ProcessFunction, \
KeyedProcessFunction
from pyflink.fn_execution import flink_fn_execution_pb2, operation_utils
from pyflink.fn_execution.beam.beam_coders import DataViewFilterCoder
Expand Down Expand Up @@ -422,36 +422,16 @@ def generate_func(self, serialized_fn):
class ProcessFunctionOperation(DataStreamStatelessFunctionOperation):

def __init__(self, spec):
self.collector = ProcessFunctionOperation.InternalCollector()
self.timer_service = ProcessFunctionOperation.InternalTimerService()
self.function_context = ProcessFunctionOperation.InternalProcessFunctionContext(
self.timer_service)
super(ProcessFunctionOperation, self).__init__(spec)

def generate_func(self, serialized_fn) -> tuple:
func, proc_func = operation_utils.extract_process_function(
serialized_fn, self.function_context, self.collector)
serialized_fn, self.function_context)
return func, [proc_func]

class InternalCollector(Collector):
"""
Internal implementation of the Collector. It uses a buffer list to store data to be emitted.
There will be a header flag for each data type. 0 means it is a proc time timer registering
request, while 1 means it is an event time timer and 2 means it is a normal data. When
registering a timer, it must take along with the corresponding key for it.
For a ProcessFunction, it will only collect normal data.
"""

def __init__(self):
self.buf = []

def collect(self, a: Any):
self.buf.append((2, a))

def clear(self):
self.buf.clear()

class InternalProcessFunctionContext(ProcessFunction.Context):
"""
Internal implementation of ProcessFunction.Context.
Expand Down Expand Up @@ -511,7 +491,7 @@ def generate_func(self, serialized_fn) -> Tuple:
self.keyed_state_backend)
return func, [proc_func]

class InternalCollector(Collector):
class InternalCollector(object):
"""
Internal implementation of the Collector. It uses a buffer list to store data to be emitted.
There will be a header flag for each data type. 0 means it is a proc time timer registering
Expand Down

0 comments on commit 1317cab

Please sign in to comment.