Skip to content

Commit

Permalink
[Issue: 3161][Go client] Return Message ID for send (apache#4811)
Browse files Browse the repository at this point in the history
### Motivation

The `Send(context.Context, ProducerMessage) error` should return `MessageID`.

```
 SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))
```
  • Loading branch information
wolfstudy authored and jiazhai committed Oct 21, 2019
1 parent 8ddd0a8 commit be6a102
Show file tree
Hide file tree
Showing 24 changed files with 265 additions and 65 deletions.
4 changes: 2 additions & 2 deletions pulsar-client-cpp/examples/SampleAsyncProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ DECLARE_LOG_OBJECT()

using namespace pulsar;

void callback(Result code, const Message& msg) {
LOG_INFO("Received code: " << code << " -- Msg: " << msg);
void callback(Result code, const MessageId& msgId) {
LOG_INFO("Received code: " << code << " -- MsgID: " << msgId);
}

int main() {
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

namespace pulsar {

typedef std::function<void(Result, const Message& msg)> SendCallback;
typedef std::function<void(Result, const MessageId& messageId)> SendCallback;
typedef std::function<void(Result)> CloseCallback;

struct ProducerConfigurationImpl;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/include/pulsar/c/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ extern "C" {

typedef struct _pulsar_producer pulsar_producer_t;

typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_t *msg, void *ctx);
typedef void (*pulsar_send_callback)(pulsar_result, pulsar_message_id_t *msgId, void *ctx);
typedef void (*pulsar_close_callback)(pulsar_result, void *ctx);
typedef void (*pulsar_flush_callback)(pulsar_result, void *ctx);

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/BatchMessageContainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, b
maxAllowedMessageBatchSizeInBytes_);
LOG_DEBUG(*this << " After serialization payload size in bytes = " << impl_->payload.readableBytes());

messagesContainerListPtr_->push_back(MessageContainer(msg, sendCallback));
messagesContainerListPtr_->push_back(MessageContainer(msg, sendCallback, msg.getMessageId()));

LOG_DEBUG(*this << " Number of messages in Batch = " << messagesContainerListPtr_->size());
LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
Expand Down
7 changes: 4 additions & 3 deletions pulsar-client-cpp/lib/BatchMessageContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ namespace pulsar {
class BatchMessageContainer {
public:
struct MessageContainer {
MessageContainer(Message message, SendCallback sendCallback)
: message_(message), sendCallback_(sendCallback) {}
MessageContainer(Message message, SendCallback sendCallback, MessageId messageId)
: message_(message), sendCallback_(sendCallback), messageId_(messageId) {}
Message message_;
SendCallback sendCallback_;
void callBack(const pulsar::Result& r) { sendCallback_(r, message_); }
MessageId messageId_;
void callBack(const pulsar::Result& r) { sendCallback_(r, messageId_); }
};
typedef std::vector<MessageContainer> MessageContainerList;
typedef std::shared_ptr<MessageContainerList> MessageContainerListPtr;
Expand Down
7 changes: 5 additions & 2 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,12 @@ void ClientConnection::handleIncomingCommand() {
const CommandSendReceipt& sendReceipt = incomingCmd_.send_receipt();
int producerId = sendReceipt.producer_id();
uint64_t sequenceId = sendReceipt.sequence_id();
const proto::MessageIdData& messageIdData = sendReceipt.message_id();
MessageId messageId = MessageId(messageIdData.partition(), messageIdData.ledgerid(),
messageIdData.entryid(), messageIdData.batch_index());

LOG_DEBUG(cnxString_ << "Got receipt for producer: " << producerId
<< " -- msg: " << sequenceId);
<< " -- msg: " << sequenceId << "-- message id: " << messageId);

Lock lock(mutex_);
ProducersMap::iterator it = producers_.find(producerId);
Expand All @@ -680,7 +683,7 @@ void ClientConnection::handleIncomingCommand() {
lock.unlock();

if (producer) {
if (!producer->ackReceived(sequenceId)) {
if (!producer->ackReceived(sequenceId, messageId)) {
// If the producer fails to process the ack, we need to close the connection
// to give it a chance to recover from there
close();
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/PartitionedProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
// change me: abort or notify failure in callback?
// change to appropriate error if callback
callback(ResultUnknownError, msg);
callback(ResultUnknownError, msg.getMessageId());
return;
}
// find a producer for that partition, index should start from 0
Expand Down
10 changes: 5 additions & 5 deletions pulsar-client-cpp/lib/Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,21 @@ Producer::Producer(ProducerImplBasePtr impl) : impl_(impl) {}
const std::string& Producer::getTopic() const { return impl_ != NULL ? impl_->getTopic() : EMPTY_STRING; }

Result Producer::send(const Message& msg) {
Promise<Result, Message> promise;
sendAsync(msg, WaitForCallbackValue<Message>(promise));
Promise<Result, MessageId> promise;
sendAsync(msg, WaitForCallbackValue<MessageId>(promise));

if (!promise.isComplete()) {
impl_->triggerFlush();
}

Message m;
Result result = promise.getFuture().get(m);
MessageId mi;
Result result = promise.getFuture().get(mi);
return result;
}

void Producer::sendAsync(const Message& msg, SendCallback callback) {
if (!impl_) {
callback(ResultProducerNotInitialized, msg);
callback(ResultProducerNotInitialized, msg.getMessageId());
return;
}

Expand Down
26 changes: 14 additions & 12 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ void ProducerImpl::failPendingMessages(Result result) {
lock.unlock();
for (std::vector<OpSendMsg>::const_iterator it = messagesToFail.begin(); it != messagesToFail.end();
it++) {
it->sendCallback_(result, it->msg_);
it->sendCallback_(result, it->msg_.getMessageId());
}

// this function can handle null pointer
Expand Down Expand Up @@ -290,11 +290,11 @@ void ProducerImpl::setMessageMetadata(const Message& msg, const uint64_t& sequen
}
}

void ProducerImpl::statsCallBackHandler(Result res, const Message& msg, SendCallback callback,
void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, SendCallback callback,
boost::posix_time::ptime publishTime) {
producerStatsBasePtr_->messageReceived(res, publishTime);
if (callback) {
callback(res, msg);
callback(res, msgId);
}
}

Expand Down Expand Up @@ -358,15 +358,15 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
// Encrypt the payload if enabled
SharedBuffer encryptedPayload;
if (!encryptMessage(msg.impl_->metadata, payload, encryptedPayload)) {
cb(ResultCryptoError, msg);
cb(ResultCryptoError, msg.getMessageId());
return;
}
payload = encryptedPayload;

if (payloadSize > keepMaxMessageSize_) {
LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
<< keepMaxMessageSize_ << " bytes");
cb(ResultMessageTooBig, msg);
cb(ResultMessageTooBig, msg.getMessageId());
return;
}
}
Expand All @@ -384,7 +384,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.release(1);
}
cb(ResultAlreadyClosed, msg);
cb(ResultAlreadyClosed, msg.getMessageId());
return;
}

Expand All @@ -394,7 +394,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
if (conf_.getBlockIfQueueFull()) {
pendingMessagesQueue_.release(1);
}
cb(ResultInvalidMessage, msg);
cb(ResultInvalidMessage, msg.getMessageId());
return;
}

Expand All @@ -415,7 +415,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
batchMessageContainer->sendMessage(NULL);
}
lock.unlock();
cb(ResultProducerQueueIsFull, msg);
cb(ResultProducerQueueIsFull, msg.getMessageId());
return;
}

Expand Down Expand Up @@ -611,7 +611,7 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
if (op.sendCallback_) {
// to protect from client callback exception
try {
op.sendCallback_(ResultChecksumError, op.msg_);
op.sendCallback_(ResultChecksumError, op.msg_.getMessageId());
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
Expand All @@ -620,12 +620,13 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) {
}
}

bool ProducerImpl::ackReceived(uint64_t sequenceId) {
bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& messageId) {
OpSendMsg op;
Lock lock(mutex_);
bool havePendingAck = pendingMessagesQueue_.peek(op);
if (!havePendingAck) {
LOG_DEBUG(getName() << " -- SequenceId - " << sequenceId << "]" //
<< " -- MessageId - " << messageId << "]"
<< "Got an SEND_ACK for expired message, ignoring it.");
return true;
}
Expand All @@ -638,7 +639,8 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId) {
} else if (sequenceId < expectedSequenceId) {
// Ignoring the ack since it's referring to a message that has already timed out.
LOG_DEBUG(getName() << "Got ack for timed out msg " << sequenceId //
<< " last-seq: " << expectedSequenceId << " producer: " << producerId_);
<< " -- MessageId - " << messageId << " last-seq: " << expectedSequenceId
<< " producer: " << producerId_);
return true;
} else {
// Message was persisted correctly
Expand All @@ -655,7 +657,7 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId) {
lock.unlock();
if (op.sendCallback_) {
try {
op.sendCallback_(ResultOk, op.msg_);
op.sendCallback_(ResultOk, messageId);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "MessageCrypto.h"
#include "stats/ProducerStatsDisabled.h"
#include "stats/ProducerStatsImpl.h"
#include "PulsarApi.pb.h"

using namespace pulsar;

Expand Down Expand Up @@ -77,7 +78,7 @@ class ProducerImpl : public HandlerBase,

bool removeCorruptMessage(uint64_t sequenceId);

bool ackReceived(uint64_t sequenceId);
bool ackReceived(uint64_t sequenceId, MessageId& messageId);

virtual void disconnectProducer();

Expand Down Expand Up @@ -129,7 +130,7 @@ class ProducerImpl : public HandlerBase,
void handleCreateProducer(const ClientConnectionPtr& cnx, Result result,
const ResponseData& responseData);

void statsCallBackHandler(Result, const Message&, SendCallback, boost::posix_time::ptime);
void statsCallBackHandler(Result, const MessageId&, SendCallback, boost::posix_time::ptime);

void handleClose(Result result, ResultCallback callback, ProducerImplPtr producer);

Expand Down
16 changes: 11 additions & 5 deletions pulsar-client-cpp/lib/c/c_Producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,22 @@ pulsar_result pulsar_producer_send(pulsar_producer_t *producer, pulsar_message_t
return (pulsar_result)producer->producer.send(msg->message);
}

static void handle_producer_send(pulsar::Result result, pulsar_message_t *msg, pulsar_send_callback callback,
void *ctx) {
callback((pulsar_result)result, msg, ctx);
static void handle_producer_send(pulsar::Result result, pulsar::MessageId messageId,
pulsar_send_callback callback, void *ctx) {
if (result == pulsar::ResultOk) {
pulsar_message_id_t *c_message_id = new pulsar_message_id_t;
c_message_id->messageId = messageId;
callback(pulsar_result_Ok, c_message_id, ctx);
} else {
callback((pulsar_result)result, NULL, ctx);
}
}

void pulsar_producer_send_async(pulsar_producer_t *producer, pulsar_message_t *msg,
pulsar_send_callback callback, void *ctx) {
msg->message = msg->builder.build();
producer->producer.sendAsync(msg->message,
std::bind(&handle_producer_send, std::placeholders::_1, msg, callback, ctx));
producer->producer.sendAsync(msg->message, std::bind(&handle_producer_send, std::placeholders::_1,
std::placeholders::_2, callback, ctx));
}

int64_t pulsar_producer_get_last_sequence_id(pulsar_producer_t *producer) {
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/perf/PerfProducer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,14 @@ typedef std::unique_lock<std::mutex> Lock;

typedef std::chrono::high_resolution_clock Clock;

void sendCallback(pulsar::Result result, const pulsar::Message& msg, Clock::time_point& publishTime) {
void sendCallback(pulsar::Result result, const pulsar::MessageId& msgId, Clock::time_point& publishTime) {
LOG_DEBUG("result = " << result);
assert(result == pulsar::ResultOk);
uint64_t latencyUsec = std::chrono::duration_cast<std::chrono::microseconds>(Clock::now() - publishTime).count();
Lock lock(mutex);
++messagesProduced;
bytesProduced += msg.getLength();
// FIXME: Please fix me in here.
// bytesProduced += msg.getLength();
e2eLatencyAccumulator(latencyUsec);
}

Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/python/src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ void Producer_send(Producer& producer, const Message& message) {
CHECK_RESULT(res);
}

void Producer_sendAsyncCallback(PyObject* callback, Result res, const Message& msg) {
void Producer_sendAsyncCallback(PyObject* callback, Result res, const MessageId& msgId) {
if (callback == Py_None) {
return;
}

PyGILState_STATE state = PyGILState_Ensure();

try {
py::call<void>(callback, res, py::object(&msg));
py::call<void>(callback, res, py::object(&msgId));
} catch (py::error_already_set e) {
PyErr_Print();
}
Expand Down
7 changes: 2 additions & 5 deletions pulsar-client-cpp/tests/AuthPluginTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ static const std::string clientPublicKeyPath =
static const std::string clientPrivateKeyPath =
"../../pulsar-broker/src/test/resources/authentication/tls/client-key.pem";

static void sendCallBackTls(Result r, const Message& msg) {
static void sendCallBackTls(Result r, const MessageId& msgId) {
ASSERT_EQ(r, ResultOk);
std::string prefix = "test-tls-message-";
std::string messageContent = prefix + std::to_string(globalTestTlsMessagesCounter++);
ASSERT_EQ(messageContent, msg.getDataAsString());
LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString());
globalTestTlsMessagesCounter++;
}

TEST(AuthPluginTest, testTls) {
Expand Down
15 changes: 6 additions & 9 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,10 @@ static void messageListenerFunctionWithoutAck(Consumer consumer, const Message &
latch.countdown();
}

static void sendCallBack(Result r, const Message &msg, std::string prefix, int *count) {
static void sendCallBack(Result r, const MessageId &msgId, std::string prefix, int *count) {
static std::mutex sendMutex_;
sendMutex_.lock();
ASSERT_EQ(r, ResultOk);
std::string messageContent = prefix + std::to_string(*count);
ASSERT_EQ(messageContent, msg.getDataAsString());
LOG_DEBUG("Received publish acknowledgement for " << msg.getDataAsString());
*count += 1;
sendMutex_.unlock();
}
Expand All @@ -90,12 +87,12 @@ static void receiveCallBack(Result r, const Message &msg, std::string &messageCo
receiveMutex_.unlock();
}

static void sendCallBackWithDelay(Result r, const Message &msg, std::string prefix, double percentage,
static void sendCallBackWithDelay(Result r, const MessageId &msgId, std::string prefix, double percentage,
uint64_t delayInMicros, int *count) {
if ((rand() % 100) <= percentage) {
std::this_thread::sleep_for(std::chrono::microseconds(delayInMicros));
}
sendCallBack(r, msg, prefix, count);
sendCallBack(r, msgId, prefix, count);
}

class EncKeyReader : public CryptoKeyReader {
Expand Down Expand Up @@ -209,7 +206,7 @@ TEST(BasicEndToEndTest, testBatchMessages) {
ASSERT_EQ(i, numOfMessages);
}

void resendMessage(Result r, const Message msg, Producer producer) {
void resendMessage(Result r, const MessageId msgId, Producer producer) {
Lock lock(mutex_);
if (r != ResultOk) {
LOG_DEBUG("globalResendMessageCount" << globalResendMessageCount);
Expand Down Expand Up @@ -2290,8 +2287,8 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
}

// for partitioned reason, it may hard to verify message id.
static void simpleCallback(Result code, const Message &msg) {
LOG_INFO("Received code: " << code << " -- Msg: " << msg);
static void simpleCallback(Result code, const MessageId &msgId) {
LOG_INFO("Received code: " << code << " -- MsgID: " << msgId);
}

TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
Expand Down
Loading

0 comments on commit be6a102

Please sign in to comment.