Skip to content

Commit

Permalink
[Issue apache#3712][python-client] exposing InitialPosition managemen…
Browse files Browse the repository at this point in the history
…t in the ConsumerConfiguration. (apache#3714)

### Motivation
PR apache#3567 introduced the SubscriptionInitialPosition option in ConsumerConfiguration in the CPP client but did not exposed any methods to be able to do it with the Python client.

This PR aims to expose the code introduced in the previous PR to allows Python user to choose the initial position of the consumer in python. 

### Modifications

- Implemented a boost object to expose InitialPosition Enum.
- Added to boost ConsumerConfiguration object the getter/setter of the InitialPosition attribute.
- Added a initial_position parameter to Client.subscribe in order to modify the ConsumerConfiguration instance created.

### Verifying this change

This change is a trivial rework / code cleanup without any test coverage.
  • Loading branch information
lelabo-m authored and sijie committed Mar 19, 2019
1 parent 1a1c557 commit 614a98d
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 3 deletions.
11 changes: 9 additions & 2 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def send_callback(res, msg):

import _pulsar

from _pulsar import Result, CompressionType, ConsumerType, PartitionsRoutingMode # noqa: F401
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode # noqa: F401

from pulsar.functions.function import Function
from pulsar.functions.context import Context
Expand Down Expand Up @@ -476,7 +476,8 @@ def subscribe(self, topic, subscription_name,
negative_ack_redelivery_delay_ms=60000,
is_read_compacted=False,
properties=None,
pattern_auto_discovery_period=60
pattern_auto_discovery_period=60,
initial_position=InitialPosition.Latest
):
"""
Subscribe to the given topic and subscription combination.
Expand Down Expand Up @@ -543,6 +544,10 @@ def my_listener(consumer, message):
can be used for identify a consumer at broker side.
* `pattern_auto_discovery_period`:
Periods of seconds for consumer to auto discover match topics.
* `initial_position`:
Set the initial position of a consumer when subscribing to the topic.
It could be either: `InitialPosition.Earliest` or `InitialPosition.Latest`.
Default: `Latest`.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
Expand All @@ -557,6 +562,7 @@ def my_listener(consumer, message):
_check_type(int, pattern_auto_discovery_period, 'pattern_auto_discovery_period')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(dict, properties, 'properties')
_check_type(InitialPosition, initial_position, 'initial_position')

conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
Expand All @@ -575,6 +581,7 @@ def my_listener(consumer, message):
if properties:
for k, v in properties.items():
conf.property(k, v)
conf.subscription_initial_position(initial_position)

conf.schema(schema.schema_info())

Expand Down
34 changes: 33 additions & 1 deletion pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import os
from pulsar import Client, MessageId, \
CompressionType, ConsumerType, PartitionsRoutingMode, \
AuthenticationTLS, Authentication, AuthenticationToken
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition

from _pulsar import ProducerConfiguration, ConsumerConfiguration

Expand Down Expand Up @@ -140,6 +140,38 @@ def test_producer_consumer(self):
consumer.unsubscribe()
client.close()

def test_consumer_initial_position(self):
client = Client(self.serviceUrl)
producer = client.create_producer('my-python-topic-producer-consumer')

# Sending 5 messages before consumer creation.
# These should be received with initial_position set to Earliest but not with Latest.
for i in range(5):
producer.send(b'hello-%d' % i)

consumer = client.subscribe('my-python-topic-producer-consumer',
'my-sub',
consumer_type=ConsumerType.Shared,
initial_position=InitialPosition.Earliest)

# Sending 5 other messages that should be received regardless of the initial_position.
for i in range(5, 10):
producer.send(b'hello-%d' % i)

for i in range(10):
msg = consumer.receive(1000)
self.assertTrue(msg)
self.assertEqual(msg.data(), b'hello-%d' % i)

try:
msg = consumer.receive(100)
self.assertTrue(False) # Should not reach this point
except:
pass # Exception is expected

consumer.unsubscribe()
client.close()

def test_message_properties(self):
client = Client(self.serviceUrl)
topic = 'my-python-test-message-properties'
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ void export_config() {
.def("read_compacted", &ConsumerConfiguration::isReadCompacted)
.def("read_compacted", &ConsumerConfiguration::setReadCompacted)
.def("property", &ConsumerConfiguration::setProperty, return_self<>())
.def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
.def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition)
;

class_<ReaderConfiguration>("ReaderConfiguration")
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/python/src/enums.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,8 @@ void export_enums() {
.value("KEY_VALUE", KEY_VALUE)
;

enum_<InitialPosition>("InitialPosition", "Supported initial position")
.value("Latest", InitialPositionLatest)
.value("Earliest", InitialPositionEarliest)
;
}

0 comments on commit 614a98d

Please sign in to comment.