Skip to content

Commit

Permalink
[FLINK-27699][python][connector/pulsar] Support StopCursor.at_publish…
Browse files Browse the repository at this point in the history
…_time

This closes apache#19771.
  • Loading branch information
a49a authored and dianfu committed May 21, 2022
1 parent e7d004d commit 18a967f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
19 changes: 15 additions & 4 deletions flink-python/pyflink/datastream/connectors/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,21 @@ def latest() -> 'StopCursor':

@staticmethod
def at_event_time(timestamp: int) -> 'StopCursor':
warnings.warn(
"at_event_time is deprecated. Use at_publish_time instead.", DeprecationWarning)
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.atEventTime(timestamp))

@staticmethod
def at_publish_time(timestamp: int) -> 'StopCursor':
"""
Stop when message publishTime is greater than the specified timestamp.
"""
JStopCursor = get_gateway().jvm \
.org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor
return StopCursor(JStopCursor.atPublishTime(timestamp))


class PulsarSource(Source):
"""
Expand All @@ -188,7 +199,7 @@ class PulsarSource(Source):
>>> source = PulsarSource() \\
... .builder() \\
... .set_topics(TOPIC1, TOPIC2) \\
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_service_url(get_service_url()) \\
... .set_admin_url(get_admin_url()) \\
... .set_subscription_name("test") \\
Expand Down Expand Up @@ -255,7 +266,7 @@ class PulsarSourceBuilder(object):
... .set_topics([TOPIC1, TOPIC2]) \\
... .set_deserialization_schema(
... PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \\
... .set_bounded_stop_cursor(StopCursor.at_event_time(int(time.time() * 1000)))
... .set_bounded_stop_cursor(StopCursor.at_publish_time(int(time.time() * 1000)))
... .build()
"""

Expand Down Expand Up @@ -310,7 +321,7 @@ def set_topics(self, topics: Union[str, List[str]]) -> 'PulsarSourceBuilder':
def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder':
"""
Set a topic pattern to consume from the java regex str. You can set topics once either with
setTopics or setTopicPattern in this builder.
set_topics or set_topic_pattern in this builder.
"""
warnings.warn("set_topics_pattern is deprecated. Use set_topic_pattern instead.",
DeprecationWarning, stacklevel=2)
Expand All @@ -320,7 +331,7 @@ def set_topics_pattern(self, topics_pattern: str) -> 'PulsarSourceBuilder':
def set_topic_pattern(self, topic_pattern: str) -> 'PulsarSourceBuilder':
"""
Set a topic pattern to consume from the java regex str. You can set topics once either with
setTopics or setTopicPattern in this builder.
set_topics or set_topic_pattern in this builder.
"""
self._j_pulsar_source_builder.setTopicPattern(topic_pattern)
return self
Expand Down
7 changes: 4 additions & 3 deletions flink-python/pyflink/datastream/tests/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def test_pulsar_source(self):
.set_topics('ada') \
.set_start_cursor(StartCursor.earliest()) \
.set_unbounded_stop_cursor(StopCursor.never()) \
.set_bounded_stop_cursor(StopCursor.at_event_time(22)) \
.set_bounded_stop_cursor(StopCursor.at_publish_time(22)) \
.set_subscription_name('ff') \
.set_subscription_type(SubscriptionType.Exclusive) \
.set_deserialization_schema(
Expand Down Expand Up @@ -254,12 +254,13 @@ def test_source_deprecated_method(self):
pulsar_source = PulsarSource.builder() \
.set_service_url('pulsar://localhost:6650') \
.set_admin_url('http://localhost:8080') \
.set_topics_pattern('ada.*') \
.set_topic_pattern('ada.*') \
.set_deserialization_schema(
PulsarDeserializationSchema.flink_type_info(Types.STRING())) \
.set_unbounded_stop_cursor(StopCursor.at_publish_time(4444)) \
.set_subscription_name('ff') \
.set_config(test_option, True) \
.set_config_with_dict({'pulsar.source.autoCommitCursorInterval': '1000'}) \
.set_properties({'pulsar.source.autoCommitCursorInterval': '1000'}) \
.build()
configuration = get_field_value(pulsar_source.get_java_function(), "sourceConfiguration")
self.assertEqual(
Expand Down

0 comments on commit 18a967f

Please sign in to comment.