Skip to content

Commit

Permalink
Added a flush method for python producer (apache#3685)
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and merlimat committed Feb 26, 2019
1 parent 397ef33 commit 0aa249a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 0 deletions.
9 changes: 9 additions & 0 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,15 @@ def callback(res, msg):
replication_clusters, disable_replication, event_timestamp)
self._producer.send_async(msg, callback)


def flush(self):
"""
Flush all the messages buffered in the client and wait until all messages have been
successfully persisted
"""
self._producer.flush()


def close(self):
"""
Close the producer.
Expand Down
12 changes: 12 additions & 0 deletions pulsar-client-cpp/python/src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ void Producer_sendAsync(Producer& producer, const Message& message, py::object c
Py_END_ALLOW_THREADS
}

void Producer_flush(Producer& producer) {
Result res;
Py_BEGIN_ALLOW_THREADS
res = producer.flush();
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void Producer_close(Producer& producer) {
Result res;
Py_BEGIN_ALLOW_THREADS
Expand Down Expand Up @@ -89,6 +98,9 @@ void export_producer() {
"\n"
"@param msg message to publish\n")
.def("send_async", &Producer_sendAsync)
.def("flush", &Producer_flush,
"Flush all the messages buffered in the client and wait until all messages have been\n"
"successfully persisted\n")
.def("close", &Producer_close)
;
}
1 change: 1 addition & 0 deletions pulsar-client-cpp/python/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,5 @@
except Exception as e:
print("Failed to send message: %s", e)

producer.flush()
producer.close()

0 comments on commit 0aa249a

Please sign in to comment.