diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 74d301117ff9b..76b08cd7b7c9e 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -801,6 +801,20 @@ def redeliver_unacknowledged_messages(self): """ self._consumer.redeliver_unacknowledged_messages() + def seek(self, messageid): + """ + Reset the subscription associated with this consumer to a specific message id. + The message id can either be a specific message or represent the first or last messages in the topic. + Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the + seek() on the individual partitions. + + **Args** + + * `message`: + The message id for seek. + """ + self._consumer.seek(messageid) + def close(self): """ Close the consumer. diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py index 8f80f4494f10b..2572557070efc 100755 --- a/pulsar-client-cpp/python/pulsar_test.py +++ b/pulsar-client-cpp/python/pulsar_test.py @@ -488,6 +488,28 @@ def test_publish_compact_and_consume(self): consumer2.close() client.close() + def test_seek(self): + client = Client(self.serviceUrl) + consumer = client.subscribe('persistent://sample/standalone/ns/my-python-topic-seek', + 'my-sub', + consumer_type=ConsumerType.Shared) + producer = client.create_producer('persistent://sample/standalone/ns/my-python-topic-seek') + + for i in range(100): + producer.send('hello-%d' % i) + + for i in range(100): + msg = consumer.receive() + self.assertEqual(msg.data(), b'hello-%d' % i) + consumer.acknowledge(msg) + + # seek, and after reconnect, expected receive first message. + consumer.seek(MessageId.earliest) + time.sleep(0.5) + msg = consumer.receive() + self.assertEqual(msg.data(), b'hello-0') + client.close() + def _check_value_error(self, fun): try: fun() diff --git a/pulsar-client-cpp/python/src/consumer.cc b/pulsar-client-cpp/python/src/consumer.cc index 557536aabb456..f4379f789a323 100644 --- a/pulsar-client-cpp/python/src/consumer.cc +++ b/pulsar-client-cpp/python/src/consumer.cc @@ -119,6 +119,15 @@ void Consumer_resumeMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.resumeMessageListener()); } +void Consumer_seek(Consumer& consumer, const MessageId& msgId) { + Result res; + Py_BEGIN_ALLOW_THREADS + res = consumer.seek(msgId); + Py_END_ALLOW_THREADS + + CHECK_RESULT(res); +} + void export_consumer() { using namespace boost::python; @@ -137,5 +146,6 @@ void export_consumer() { .def("pause_message_listener", &Consumer_pauseMessageListener) .def("resume_message_listener", &Consumer_resumeMessageListener) .def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages) + .def("seek", &Consumer_seek) ; }