Skip to content

Commit

Permalink
[FLINK-29681][python] Fix Python side-output operator not generated i…
Browse files Browse the repository at this point in the history
…n some cases

This closes apache#21104.
  • Loading branch information
vancior98 authored and HuangXingBo committed Oct 19, 2022
1 parent 426c391 commit 93a9c50
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 6 deletions.
2 changes: 1 addition & 1 deletion flink-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ under the License.
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>pemja</artifactId>
<version>0.2.4</version>
<version>0.2.6</version>
</dependency>

<!-- Protobuf dependencies -->
Expand Down
3 changes: 2 additions & 1 deletion flink-python/pyflink/datastream/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,8 @@ def get_side_output(self, output_tag: OutputTag) -> 'DataStream':
.. versionadded:: 1.16.0
"""
return DataStream(self._j_data_stream.getSideOutput(output_tag.get_java_output_tag()))
ds = DataStream(self._j_data_stream.getSideOutput(output_tag.get_java_output_tag()))
return ds.map(lambda i: i, output_type=output_tag.type_info)

def cache(self) -> 'CachedDataStream':
"""
Expand Down
15 changes: 15 additions & 0 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,21 @@ def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'):
side_expected = ['1', '1', '2', '2', '3', '3', '4', '4']
self.assert_equals_sorted(side_expected, side_sink.get_results())

def test_side_output_stream_execute_and_collect(self):
tag = OutputTag("side", Types.INT())

class MyProcessFunction(ProcessFunction):

def process_element(self, value, ctx):
yield value
yield tag, value * 2

ds = self.env.from_collection([1, 2, 3], Types.INT()).process(MyProcessFunction())
ds_side = ds.get_side_output(tag)
result = [i for i in ds_side.execute_and_collect()]
expected = [2, 4, 6]
self.assert_equals_sorted(expected, result)


class DataStreamStreamingTests(DataStreamTests):

Expand Down
28 changes: 26 additions & 2 deletions flink-python/pyflink/table/tests/test_table_environment_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from pyflink.common.serializer import TypeSerializer
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import MergingWindowAssigner, TimeWindow, Trigger, TriggerResult
from pyflink.datastream.functions import WindowFunction
from pyflink.datastream import MergingWindowAssigner, TimeWindow, Trigger, TriggerResult, OutputTag
from pyflink.datastream.functions import WindowFunction, ProcessFunction
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.datastream.window import TimeWindowSerializer
from pyflink.java_gateway import get_gateway
Expand Down Expand Up @@ -560,6 +560,30 @@ def test_to_retract_stream(self):
"(True, Row(f0=2, f1='Hello'))"]
self.assertEqual(result, expected)

def test_side_output_stream_to_table(self):
tag = OutputTag("side", Types.ROW([Types.INT()]))

class MyProcessFunction(ProcessFunction):

def process_element(self, value, ctx):
yield Row(value)
yield tag, Row(value * 2)

ds = self.env.from_collection([1, 2, 3], Types.INT()).process(MyProcessFunction())
ds_side = ds.get_side_output(tag)
expected = ['<Row(2)>', '<Row(4)>', '<Row(6)>']

t = self.t_env.from_data_stream(ds_side)
result = [str(i) for i in t.execute().collect()]
result.sort()
self.assertEqual(expected, result)

self.t_env.create_temporary_view("side_table", ds_side)
table_result = self.t_env.execute_sql("SELECT * FROM side_table")
result = [str(i) for i in table_result.collect()]
result.sort()
self.assertEqual(expected, result)


class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):

Expand Down
2 changes: 1 addition & 1 deletion flink-python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def extracted_output_files(base_dir, file_path, output_directory):
'cloudpickle==2.1.0', 'avro-python3>=1.8.1,!=1.9.2,<1.10.0',
'pytz>=2018.3', 'fastavro>=1.1.0,<1.4.8', 'requests>=2.26.0',
'protobuf<3.18',
'pemja==0.2.4;'
'pemja==0.2.6;'
'python_full_version >= "3.7" and platform_system != "Windows"',
'httplib2>=0.19.0,<=0.20.4', apache_flink_libraries_dependency]

Expand Down
2 changes: 1 addition & 1 deletion flink-python/src/main/resources/META-INF/NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ This project bundles the following dependencies under the Apache Software Licens
- org.apache.beam:beam-vendor-bytebuddy-1_11_0:0.1
- org.apache.beam:beam-vendor-guava-26_0-jre:0.1
- org.apache.beam:beam-vendor-grpc-1_43_2:0.1
- com.alibaba:pemja:0.2.4
- com.alibaba:pemja:0.2.6

This project bundles the following dependencies under the BSD license.
See bundled license files for details
Expand Down

0 comments on commit 93a9c50

Please sign in to comment.