Skip to content

Commit

Permalink
add consumer.seek in python client (apache#2008)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaijack authored and merlimat committed Jun 21, 2018
1 parent b7e3fdd commit a545c2e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
14 changes: 14 additions & 0 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,20 @@ def redeliver_unacknowledged_messages(self):
"""
self._consumer.redeliver_unacknowledged_messages()

def seek(self, messageid):
"""
Reset the subscription associated with this consumer to a specific message id.
The message id can either be a specific message or represent the first or last messages in the topic.
Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
seek() on the individual partitions.
**Args**
* `message`:
The message id for seek.
"""
self._consumer.seek(messageid)

def close(self):
"""
Close the consumer.
Expand Down
22 changes: 22 additions & 0 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,28 @@ def test_publish_compact_and_consume(self):
consumer2.close()
client.close()

def test_seek(self):
client = Client(self.serviceUrl)
consumer = client.subscribe('persistent://sample/standalone/ns/my-python-topic-seek',
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-seek')

for i in range(100):
producer.send('hello-%d' % i)

for i in range(100):
msg = consumer.receive()
self.assertEqual(msg.data(), b'hello-%d' % i)
consumer.acknowledge(msg)

# seek, and after reconnect, expected receive first message.
consumer.seek(MessageId.earliest)
time.sleep(0.5)
msg = consumer.receive()
self.assertEqual(msg.data(), b'hello-0')
client.close()

def _check_value_error(self, fun):
try:
fun()
Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/python/src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,15 @@ void Consumer_resumeMessageListener(Consumer& consumer) {
CHECK_RESULT(consumer.resumeMessageListener());
}

void Consumer_seek(Consumer& consumer, const MessageId& msgId) {
Result res;
Py_BEGIN_ALLOW_THREADS
res = consumer.seek(msgId);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void export_consumer() {
using namespace boost::python;

Expand All @@ -137,5 +146,6 @@ void export_consumer() {
.def("pause_message_listener", &Consumer_pauseMessageListener)
.def("resume_message_listener", &Consumer_resumeMessageListener)
.def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages)
.def("seek", &Consumer_seek)
;
}

0 comments on commit a545c2e

Please sign in to comment.