Skip to content

Commit

Permalink
[FLINK-18997][python] Rename parameter name type_info to result_type …
Browse files Browse the repository at this point in the history
…for DataStream.flat_map()
  • Loading branch information
hequn8128 committed Aug 19, 2020
1 parent 0522827 commit 183f4c1
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 9 deletions.
14 changes: 7 additions & 7 deletions flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,16 +246,16 @@ def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation =
j_python_data_stream_scalar_function_operator
))

def flat_map(self, func: Union[Callable, FlatMapFunction], type_info: TypeInformation = None) \
-> 'DataStream':
def flat_map(self, func: Union[Callable, FlatMapFunction],
result_type: TypeInformation = None) -> 'DataStream':
"""
Applies a FlatMap transformation on a DataStream. The transformation calls a FlatMapFunction
for each element of the DataStream. Each FlatMapFunction call can return any number of
elements including none. The user can also extend RichFlatMapFunction to gain access to
other features provided by the RichFUnction.
:param func: The FlatMapFunction that is called for each element of the DataStream.
:param type_info: The type information of output data.
:param result_type: The type information of output data.
:return: The transformed DataStream.
"""
if not isinstance(func, FlatMapFunction):
Expand All @@ -267,7 +267,7 @@ def flat_map(self, func: Union[Callable, FlatMapFunction], type_info: TypeInform
from pyflink.fn_execution import flink_fn_execution_pb2
j_python_data_stream_scalar_function_operator, j_output_type_info = \
self._get_java_python_function_operator(func,
type_info,
result_type,
func_name,
flink_fn_execution_pb2
.UserDefinedDataStreamFunction.FLAT_MAP)
Expand Down Expand Up @@ -337,7 +337,7 @@ def flat_map(self, value):

j_input_type = self._j_data_stream.getTransformation().getOutputType()
type_info = typeinfo._from_java_type(j_input_type)
j_data_stream = self.flat_map(FilterFlatMap(func), type_info=type_info)._j_data_stream
j_data_stream = self.flat_map(FilterFlatMap(func), result_type=type_info)._j_data_stream
filtered_stream = DataStream(j_data_stream)
filtered_stream.name("Filter")
return filtered_stream
Expand Down Expand Up @@ -769,9 +769,9 @@ def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation =
-> 'DataStream':
return self._values().map(func, output_type)

def flat_map(self, func: Union[Callable, FlatMapFunction], type_info: TypeInformation = None)\
def flat_map(self, func: Union[Callable, FlatMapFunction], result_type: TypeInformation = None)\
-> 'DataStream':
return self._values().flat_map(func, type_info)
return self._values().flat_map(func, result_type)

def reduce(self, func: Union[Callable, ReduceFunction]) -> 'DataStream':
"""
Expand Down
4 changes: 2 additions & 2 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def test_map_function_with_data_types_and_function_object(self):
def test_flat_map_function(self):
ds = self.env.from_collection([('a', 0), ('ab', 1), ('bdc', 2), ('cfgs', 3), ('deeefg', 4)],
type_info=Types.ROW([Types.STRING(), Types.INT()]))
ds.flat_map(MyFlatMapFunction(), type_info=Types.ROW([Types.STRING(), Types.INT()]))\
ds.flat_map(MyFlatMapFunction(), result_type=Types.ROW([Types.STRING(), Types.INT()]))\
.add_sink(self.test_sink)

self.env.execute('flat_map_test')
Expand All @@ -212,7 +212,7 @@ def flat_map(value):
if value[1] % 2 == 0:
yield value

ds.flat_map(flat_map, type_info=Types.ROW([Types.STRING(), Types.INT()]))\
ds.flat_map(flat_map, result_type=Types.ROW([Types.STRING(), Types.INT()]))\
.add_sink(self.test_sink)
self.env.execute('flat_map_test')
results = self.test_sink.get_results(False)
Expand Down

0 comments on commit 183f4c1

Please sign in to comment.