Skip to content

Commit

Permalink
Only throw Python exception when the current thread is holding the GIL (
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jun 21, 2017
1 parent 2ca1a7a commit afd64df
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 16 deletions.
24 changes: 21 additions & 3 deletions pulsar-client-cpp/python/src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,37 @@

Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) {
Producer producer;
CHECK_RESULT(client.createProducer(topic, conf, producer));
Result res;

Py_BEGIN_ALLOW_THREADS
res = client.createProducer(topic, conf, producer);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
return producer;
}

Consumer Client_subscribe(Client& client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf) {
Consumer consumer;
CHECK_RESULT(client.subscribe(topic, subscriptionName, conf, consumer));
Result res;

Py_BEGIN_ALLOW_THREADS
res = client.subscribe(topic, subscriptionName, conf, consumer);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
return consumer;
}

void Client_close(Client& client) {
CHECK_RESULT(client.close());
Result res;

Py_BEGIN_ALLOW_THREADS
res = client.close();
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void export_client() {
Expand Down
52 changes: 44 additions & 8 deletions pulsar-client-cpp/python/src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,79 @@
#include "utils.h"

void Consumer_unsubscribe(Consumer& consumer) {
CHECK_RESULT(consumer.unsubscribe());
Result res;
Py_BEGIN_ALLOW_THREADS
res = consumer.unsubscribe();
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

Message Consumer_receive(Consumer& consumer) {
Message msg;
Result res;
Py_BEGIN_ALLOW_THREADS
CHECK_RESULT(consumer.receive(msg));
res = consumer.receive(msg);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
return msg;
}

Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
Message msg;
Result res;
Py_BEGIN_ALLOW_THREADS
CHECK_RESULT(consumer.receive(msg, timeoutMs));
res = consumer.receive(msg, timeoutMs);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
return msg;
}

void Consumer_acknowledge(Consumer& consumer, const Message& msg) {
CHECK_RESULT(consumer.acknowledge(msg));
Result res;
Py_BEGIN_ALLOW_THREADS
res = consumer.acknowledge(msg);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
CHECK_RESULT(consumer.acknowledge(msgId));
Result res;
Py_BEGIN_ALLOW_THREADS
res = consumer.acknowledge(msgId);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
CHECK_RESULT(consumer.acknowledgeCumulative(msg));
Result res;
Py_BEGIN_ALLOW_THREADS
res = consumer.acknowledgeCumulative(msg);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
CHECK_RESULT(consumer.acknowledgeCumulative(msgId));
Result res;
Py_BEGIN_ALLOW_THREADS
res = consumer.acknowledgeCumulative(msgId);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void Consumer_close(Consumer& consumer) {
CHECK_RESULT(consumer.close());
Result res;
Py_BEGIN_ALLOW_THREADS
res = consumer.close();
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void Consumer_pauseMessageListener(Consumer& consumer) {
Expand Down
17 changes: 13 additions & 4 deletions pulsar-client-cpp/python/src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@
#include "utils.h"

void Producer_send(Producer& producer, const Message& message) {
CHECK_RESULT(producer.send(message));
Result res;
Py_BEGIN_ALLOW_THREADS
res = producer.send(message);
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void Producer_sendAsyncCallback(PyObject* callback, Result res, const Message& msg) {
Expand Down Expand Up @@ -47,7 +52,12 @@ void Producer_sendAsync(Producer& producer, const Message& message, py::object c
}

void Producer_close(Producer& producer) {
CHECK_RESULT(producer.close());
Result res;
Py_BEGIN_ALLOW_THREADS
res = producer.close();
Py_END_ALLOW_THREADS

CHECK_RESULT(res);
}

void export_producer() {
Expand All @@ -72,5 +82,4 @@ void export_producer() {
.def("send_async", &Producer_sendAsync)
.def("close", &Producer_close)
;

}
}
3 changes: 2 additions & 1 deletion pulsar-client-cpp/python/src/pulsar.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ void export_enums();
void export_authentication();


inline void translateException(const PulsarException& ex) {
static void translateException(const PulsarException& ex) {
std::string err = "Pulsar error: ";
err += strResult(ex._result);

PyErr_SetString(PyExc_Exception, err.c_str());
}

Expand Down

0 comments on commit afd64df

Please sign in to comment.