From 18a967f8ad7b22c2942e227fb84f08f552660b5a Mon Sep 17 00:00:00 2001 From: Ada Wang Date: Thu, 19 May 2022 20:02:34 +0800 Subject: [PATCH] [FLINK-27699][python][connector/pulsar] Support StopCursor.at_publish_time This closes #19771. --- .../pyflink/datastream/connectors/pulsar.py | 19 +++++++++++++++---- .../datastream/tests/test_connectors.py | 7 ++++--- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/flink-python/pyflink/datastream/connectors/pulsar.py b/flink-python/pyflink/datastream/connectors/pulsar.py index 54ba513ecebf3..b6d58290830c9 100644 --- a/flink-python/pyflink/datastream/connectors/pulsar.py +++ b/flink-python/pyflink/datastream/connectors/pulsar.py @@ -172,10 +172,21 @@ def latest() -> 'StopCursor': @staticmethod def at_event_time(timestamp: int) -> 'StopCursor': + warnings.warn( + "at_event_time is deprecated. Use at_publish_time instead.", DeprecationWarning) JStopCursor = get_gateway().jvm \ .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor return StopCursor(JStopCursor.atEventTime(timestamp)) + @staticmethod + def at_publish_time(timestamp: int) -> 'StopCursor': + """ + Stop when message publishTime is greater than the specified timestamp. + """ + JStopCursor = get_gateway().jvm \ + .org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor + return StopCursor(JStopCursor.atPublishTime(timestamp)) + class PulsarSource(Source): """ @@ -188,7 +199,7 @@ class PulsarSource(Source): >>> source = PulsarSource() \\ ... .builder() \\ - ... .set_topics(TOPIC1, TOPIC2) \\ + ... .set_topics([TOPIC1, TOPIC2]) \\ ... .set_service_url(get_service_url()) \\ ... .set_admin_url(get_admin_url()) \\ ... .set_subscription_name("test") \\ @@ -255,7 +266,7 @@ class PulsarSourceBuilder(object): ... .set_topics([TOPIC1, TOPIC2]) \\ ... .set_deserialization_schema( ... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\ - ... .set_bounded_stop_cursor(StopCursor.at_event_time(int(time.time() * 1000))) + ... .set_bounded_stop_cursor(StopCursor.at_publish_time(int(time.time() * 1000))) ... .build() """ @@ -310,7 +321,7 @@ def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSourceBuilder': def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder': """ Set a topic pattern to consume from the java regex str. You can set topics once either with - setTopics or setTopicPattern in this builder. + set_topics or set_topic_pattern in this builder. """ warnings.warn("set_topics_pattern is deprecated. Use set_topic_pattern instead.", DeprecationWarning, stacklevel=2) @@ -320,7 +331,7 @@ def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder': def set_topic_pattern(self, topic_pattern: str) -> 'PulsarSourceBuilder': """ Set a topic pattern to consume from the java regex str. You can set topics once either with - setTopics or setTopicPattern in this builder. + set_topics or set_topic_pattern in this builder. """ self._j_pulsar_source_builder.setTopicPattern(topic_pattern) return self diff --git a/flink-python/pyflink/datastream/tests/test_connectors.py b/flink-python/pyflink/datastream/tests/test_connectors.py index 05c37db24faa6..93cbf8492760c 100644 --- a/flink-python/pyflink/datastream/tests/test_connectors.py +++ b/flink-python/pyflink/datastream/tests/test_connectors.py @@ -179,7 +179,7 @@ def test_pulsar_source(self): .set_topics('ada') \ .set_start_cursor(StartCursor.earliest()) \ .set_unbounded_stop_cursor(StopCursor.never()) \ - .set_bounded_stop_cursor(StopCursor.at_event_time(22)) \ + .set_bounded_stop_cursor(StopCursor.at_publish_time(22)) \ .set_subscription_name('ff') \ .set_subscription_type(SubscriptionType.Exclusive) \ .set_deserialization_schema( @@ -254,12 +254,13 @@ def test_source_deprecated_method(self): pulsar_source = PulsarSource.builder() \ .set_service_url('pulsar://localhost:6650') \ .set_admin_url('http://localhost:8080') \ - .set_topics_pattern('ada.*') \ + .set_topic_pattern('ada.*') \ .set_deserialization_schema( PulsarDeserializationSchema.flink_type_info(Types.STRING())) \ + .set_unbounded_stop_cursor(StopCursor.at_publish_time(4444)) \ .set_subscription_name('ff') \ .set_config(test_option, True) \ - .set_config_with_dict({'pulsar.source.autoCommitCursorInterval': '1000'}) \ + .set_properties({'pulsar.source.autoCommitCursorInterval': '1000'}) \ .build() configuration = get_field_value(pulsar_source.get_java_function(), "sourceConfiguration") self.assertEqual(