Skip to content

Commit

Permalink
Support max message size for cpp and go client (apache#4348)
Browse files Browse the repository at this point in the history
* support max message size for cpp and go client

Signed-off-by: xiaolong.ran <[email protected]>

* format cpp code

Signed-off-by: xiaolong.ran <[email protected]>

* fix maxMessageSize logic

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix ci error

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>
  • Loading branch information
wolfstudy authored and merlimat committed May 31, 2019
1 parent 9db06a2 commit 0ff23b4
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/BatchMessageContainer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload);
impl_->payload = encryptedPayload;

if (impl_->payload.readableBytes() > Commands::MaxMessageSize) {
if (impl_->payload.readableBytes() > producer_.keepMaxMessageSize_) {
// At this point the compressed batch is above the overall MaxMessageSize. There
// can only 1 single message in the batch at this point.
batchMessageCallBack(ResultMessageTooBig, messagesContainerListPtr_, nullptr);
Expand Down
9 changes: 9 additions & 0 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
operationsTimeout_(seconds(clientConfiguration.getOperationTimeoutSeconds())),
authentication_(authentication),
serverProtocolVersion_(ProtocolVersion_MIN),
maxMessageSize_(Commands::DefaultMaxMessageSize),
executor_(executor),
resolver_(executor->createTcpResolver()),
socket_(executor->createSocket()),
Expand Down Expand Up @@ -224,6 +225,12 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
return;
}

if (cmdConnected.has_max_message_size()) {
LOG_DEBUG("Connection has max message size setting: " << cmdConnected.max_message_size());
maxMessageSize_ = cmdConnected.max_message_size();
LOG_DEBUG("Current max message size is: " << maxMessageSize_);
}

state_ = Ready;
serverProtocolVersion_ = cmdConnected.protocol_version();
connectPromise_.setValue(shared_from_this());
Expand Down Expand Up @@ -1366,6 +1373,8 @@ const std::string& ClientConnection::cnxString() const { return cnxString_; }

int ClientConnection::getServerProtocolVersion() const { return serverProtocolVersion_; }

int ClientConnection::getMaxMessageSize() const { return maxMessageSize_; }

Commands::ChecksumType ClientConnection::getChecksumType() const {
return getServerProtocolVersion() >= proto::v6 ? Commands::Crc32c : Commands::None;
}
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien

int getServerProtocolVersion() const;

int getMaxMessageSize() const;

Commands::ChecksumType getChecksumType() const;

Future<Result, BrokerConsumerStatsImpl> newConsumerStats(uint64_t consumerId, uint64_t requestId);
Expand Down Expand Up @@ -237,6 +239,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
TimeDuration operationsTimeout_;
AuthenticationPtr authentication_;
int serverProtocolVersion_;
int maxMessageSize_;

ExecutorServicePtr executor_;

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Commands {
};
enum WireFormatConstant
{
MaxMessageSize = (5 * 1024 * 1024 - (10 * 1024)),
DefaultMaxMessageSize = (5 * 1024 * 1024 - (10 * 1024)),
MaxFrameSize = (5 * 1024 * 1024)
};

Expand Down
16 changes: 11 additions & 5 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -459,11 +459,17 @@ bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, con

uint32_t uncompressedSize = metadata.uncompressed_size();
uint32_t payloadSize = payload.readableBytes();
if (payloadSize > Commands::MaxMessageSize) {
// Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize //
<< " at " << msg.message_id().ledgerid() << ":" << msg.message_id().entryid());
discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::UncompressedSizeCorruption);
if (cnx) {
if (payloadSize > cnx->getMaxMessageSize()) {
// Uncompressed size is itself corrupted since it cannot be bigger than the MaxMessageSize
LOG_ERROR(getName() << "Got corrupted payload message size " << payloadSize //
<< " at " << msg.message_id().ledgerid() << ":"
<< msg.message_id().entryid());
discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::UncompressedSizeCorruption);
return false;
}
} else {
LOG_ERROR("Connection not ready for Consumer - " << getConsumerId());
return false;
}

Expand Down
7 changes: 4 additions & 3 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());

Lock lock(mutex_);
keepMaxMessageSize_ = cnx->getMaxMessageSize();
cnx->registerProducer(producerId_, shared_from_this());
producerName_ = responseData.producerName;
schemaVersion_ = responseData.schemaVersion;
Expand Down Expand Up @@ -338,7 +339,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {

uint32_t uncompressedSize = payload.readableBytes();
uint32_t payloadSize = uncompressedSize;

ClientConnectionPtr cnx = getCnx().lock();
if (!batchMessageContainer) {
// If batching is enabled we compress all the payloads together before sending the batch
payload = CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload);
Expand All @@ -352,9 +353,9 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
}
payload = encryptedPayload;

if (payloadSize > Commands::MaxMessageSize) {
if (payloadSize > keepMaxMessageSize_) {
LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed "
<< Commands::MaxMessageSize << " bytes");
<< keepMaxMessageSize_ << " bytes");
cb(ResultMessageTooBig, msg);
return;
}
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/lib/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class ProducerImpl : public HandlerBase,
const ProducerConfiguration& producerConfiguration);
~ProducerImpl();

int keepMaxMessageSize_;

virtual const std::string& getTopic() const;

virtual void sendAsync(const Message& msg, SendCallback callback);
Expand Down
8 changes: 4 additions & 4 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -561,14 +561,14 @@ TEST(BasicEndToEndTest, testMessageTooBig) {
Result result = client.createProducer(topicName, conf, producer);
ASSERT_EQ(ResultOk, result);

int size = Commands::MaxMessageSize + 1;
int size = Commands::DefaultMaxMessageSize + 1000 * 100;
char *content = new char[size];
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer.send(msg);
ASSERT_EQ(ResultMessageTooBig, result);

// Anything up to MaxMessageSize should be allowed
size = Commands::MaxMessageSize;
size = Commands::DefaultMaxMessageSize;
msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
Expand Down Expand Up @@ -1114,7 +1114,7 @@ TEST(BasicEndToEndTest, testProduceMessageSize) {
result = producerFuture.get(producer2);
ASSERT_EQ(ResultOk, result);

int size = Commands::MaxMessageSize + 1;
int size = Commands::DefaultMaxMessageSize + 1000 * 100;
char *content = new char[size];
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer1.send(msg);
Expand Down Expand Up @@ -1165,7 +1165,7 @@ TEST(BasicEndToEndTest, testBigMessageSizeBatching) {
result = client.createProducer(topicName, conf2, producer2);
ASSERT_EQ(ResultOk, result);

int size = Commands::MaxMessageSize + 1;
int size = Commands::DefaultMaxMessageSize + 1000 * 100;
char *content = new char[size];
Message msg = MessageBuilder().setAllocatedContent(content, size).build();
result = producer1.send(msg);
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ module github.com/apache/pulsar/pulsar-client-go

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6
github.com/davecgh/go-spew v1.1.1
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.1
Expand Down
2 changes: 0 additions & 2 deletions pulsar-client-go/go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6 h1:xadBCbc8D9mmkaNfCsEBHbIoCjbayJXJNsY1JjPjNio=
github.com/alecthomas/jsonschema v0.0.0-20190122210438-a6952de1bbe6/go.mod h1:qpebaTNSsyUn5rPSJMsfqEtDw71TTggXM6stUDI16HA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down

0 comments on commit 0ff23b4

Please sign in to comment.