From 4df6a398bbe2a9de7c23977176789e54cc0848fa Mon Sep 17 00:00:00 2001 From: huangxingbo Date: Mon, 12 Dec 2022 19:33:21 +0800 Subject: [PATCH] [FLINK-29461][python] Make the test_process_function more stable This closes #21491. --- .../datastream/tests/test_data_stream.py | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py index 009c8414ee101..6013c9cf0d82c 100644 --- a/flink-python/pyflink/datastream/tests/test_data_stream.py +++ b/flink-python/pyflink/datastream/tests/test_data_stream.py @@ -980,9 +980,8 @@ class MyProcessFunction(ProcessFunction): def process_element(self, value, ctx): current_timestamp = ctx.timestamp() - current_watermark = ctx.timer_service().current_watermark() - yield "current timestamp: {}, current watermark: {}, current_value: {}"\ - .format(str(current_timestamp), str(current_watermark), str(value)) + yield "current timestamp: {}, current_value: {}"\ + .format(str(current_timestamp), str(value)) watermark_strategy = WatermarkStrategy.for_monotonous_timestamps()\ .with_timestamp_assigner(SecondColumnTimestampAssigner()) @@ -990,14 +989,14 @@ def process_element(self, value, ctx): .process(MyProcessFunction(), output_type=Types.STRING()).add_sink(self.test_sink) self.env.execute('test process function') results = self.test_sink.get_results() - expected = ["current timestamp: 1603708211000, current watermark: " - "-9223372036854775808, current_value: Row(f0=1, f1='1603708211000')", - "current timestamp: 1603708224000, current watermark: " - "-9223372036854775808, current_value: Row(f0=2, f1='1603708224000')", - "current timestamp: 1603708226000, current watermark: " - "-9223372036854775808, current_value: Row(f0=3, f1='1603708226000')", - "current timestamp: 1603708289000, current watermark: " - "-9223372036854775808, current_value: Row(f0=4, f1='1603708289000')"] + expected = ["current timestamp: 1603708211000, " + "current_value: Row(f0=1, f1='1603708211000')", + "current timestamp: 1603708224000, " + "current_value: Row(f0=2, f1='1603708224000')", + "current timestamp: 1603708226000, " + "current_value: Row(f0=3, f1='1603708226000')", + "current timestamp: 1603708289000, " + "current_value: Row(f0=4, f1='1603708289000')"] self.assert_equals_sorted(expected, results) def test_process_side_output(self):