Skip to content

Commit

Permalink
[Issue 6000][pulsar-client] C++ client ACK grouping feature. (apache#…
Browse files Browse the repository at this point in the history
…6534)

Fixes apache#6000 

### Motivation

In our production environment, we created a sinker: It combines messages read from Pulsar, and writes the combined data into ClickHouse, which is not good at processing highly concurrent inserting (max. 100 QPS).

In the sinker, we used failover subscription with cumulative ACK. The reason for not using shared subscription is that, individual ACK in C++ client siginificantly reduces throughput. Our analysis shows that huge amount of individual ACK requests results in highly concurrent accessing, which finally results in terrible throughput. BTW, the machines, where we deploy brokers and bookies, are not equipped with SSD. There could be some configuration for brokers or bookies helping improving the situation, but we don't find them.

Therefore, the idea is that combining individual ACK requests within one request. After reading the protocol (PulsarApi.proto:CommandAck), we find that this feature is supported. According to this issue apache#6000 , Pulsar C++ client SDK does not implement ACK grouping feature, but it's already implemented by Java client.

After discussing with @jiazhai and @sijie , we are authorized to implement this feature and contribute this feature back to community later. It should follow the Java client's behavior and provide similar interfaces that Java client provides.

### Modifications

#### Interfaces

Similar to Java client, C++ client implements three trackers to *cache* ACK requests within a configured time window:

1. `AckGroupingTracker`: this is the base class of the other two trackers, it defines interfaces and provides empty implementation which does not send ACK requests to broker. This tracker is used for non-persistent topic, which actually does not require ACK.
2. `AckGroupingTrackerDisabled`: child class of `AckGroupingTracker`. It does not provide ACK grouping ability, and acts just like the previous individual ACK.
3. `AckGroupingTrackerEnabled`: child class of `AckGroupingTracker`. This is the real implementation of ACK grouping.

The trackers provides following public interfaces:

1. `isDuplicate`: checking if the given message ID exists in the TO-BE-ACKED group.
2. `addAcknowledge` and `addAcknowledgeCumulative`: unlike Java client, which combines these two interfaces into one (`addAcknowledge`), C++ clients provides them for individual and cumulative ACK requests. Such design can provide slightly better performance than if-else implementation.
3. `close`: closing the tracker.
4. `flush` and `flushAndClean`: flushing all pending ACK requests, the later one also resets internal status.

#### Consumer's Configuration

Two new configuration options are added:

1. `ackGroupingTimeMs`: time window in milliseconds for grouping ACK requests. If setting to positive value, ACK requests are grouped and sent to broker every `ackGroupingTimeMs` milliseconds (`AckGroupingTrackerEnabled`). For non-positive values, ACK requests are sent one by one to brokers as before (`AckGroupingTrackerDisabled`). Default is 100.
2. `ackGroupingMaxSize`: maximum number of grouped message IDs. Java client hard-coded it to 1000 for now. However, 1000 is too small in our scenario. Once the grouped message number reaches this limit, the ACK requests will be packed into one and sent to broker, even the ACK grouping deadline (`ackGroupingTimeMs`) is not reached. Non-positive values remove such limit.

**In addition**, these configurations are added into C API as well.

#### Commands for this feature.

A few new command factory interfaces are implemented, just like in Java, incl.

* `newMultiMessageAck`: command object for multi-message ACK requests.
* `peerSupports*`: interfaces for checking versions.

#### Topic Domain

Used to help defining and distinguish non-persistent and persistent topics.

### Verifying this change

- [x] Make sure that the change passes the CI checks.

This PR added 7 unit tests in `BasicEndToEndTest`, all of them start with `testAckGroupingTracker`. They cover the ACK grouping tracker's logic.

### Does this pull request potentially affect one of the following parts:

NO

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API: (no)
  - The schema: (no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  - If a feature is not applicable for documentation, explain why?
  - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation
  • Loading branch information
tongsucn authored Jun 5, 2020
1 parent 6826040 commit 9e4f854
Show file tree
Hide file tree
Showing 20 changed files with 1,168 additions and 53 deletions.
32 changes: 32 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,38 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
long getNegativeAckRedeliveryDelayMs() const;

/**
* Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent
* to broker until the time window reaches its end, or the number of grouped messages reaches
* limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be
* directly sent to broker without grouping.
*
* @param ackGroupMillis time of ACK grouping window in milliseconds.
*/
void setAckGroupingTimeMs(long ackGroupingMillis);

/**
* Get grouping time window in milliseconds.
*
* @return grouping time window in milliseconds.
*/
long getAckGroupingTimeMs() const;

/**
* Set max number of grouped messages within one grouping time window. If it's set to a
* non-positive value, number of grouped messages is not limited. Default is 1000.
*
* @param maxGroupingSize max number of grouped messages with in one grouping time window.
*/
void setAckGroupingMaxSize(long maxGroupingSize);

/**
* Get max number of grouped messages within one grouping time window.
*
* @return max number of grouped messages within one grouping time window.
*/
long getAckGroupingMaxSize() const;

/**
* Set the time duration for which the broker side consumer stats will be cached in the client.
* @param cacheTimeInMs in milliseconds
Expand Down
40 changes: 40 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/consumer_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,46 @@ PULSAR_PUBLIC void pulsar_configure_set_negative_ack_redelivery_delay_ms(
PULSAR_PUBLIC long pulsar_configure_get_negative_ack_redelivery_delay_ms(
pulsar_consumer_configuration_t *consumer_configuration);

/**
* Set time window in milliseconds for grouping message ACK requests. An ACK request is not sent
* to broker until the time window reaches its end, or the number of grouped messages reaches
* limit. Default is 100 milliseconds. If it's set to a non-positive value, ACK requests will be
* directly sent to broker without grouping.
*
* @param consumer_configuration the consumer conf object
* @param ackGroupMillis time of ACK grouping window in milliseconds.
*/
PULSAR_PUBLIC void pulsar_configure_set_ack_grouping_time_ms(
pulsar_consumer_configuration_t *consumer_configuration, long ackGroupingMillis);

/**
* Get grouping time window in milliseconds.
*
* @param consumer_configuration the consumer conf object
* @return grouping time window in milliseconds.
*/
PULSAR_PUBLIC long pulsar_configure_get_ack_grouping_time_ms(
pulsar_consumer_configuration_t *consumer_configuration);

/**
* Set max number of grouped messages within one grouping time window. If it's set to a
* non-positive value, number of grouped messages is not limited. Default is 1000.
*
* @param consumer_configuration the consumer conf object
* @param maxGroupingSize max number of grouped messages with in one grouping time window.
*/
PULSAR_PUBLIC void pulsar_configure_set_ack_grouping_max_size(
pulsar_consumer_configuration_t *consumer_configuration, long maxGroupingSize);

/**
* Get max number of grouped messages within one grouping time window.
*
* @param consumer_configuration the consumer conf object
* @return max number of grouped messages within one grouping time window.
*/
PULSAR_PUBLIC long pulsar_configure_get_ack_grouping_max_size(
pulsar_consumer_configuration_t *consumer_configuration);

PULSAR_PUBLIC int pulsar_consumer_is_encryption_enabled(
pulsar_consumer_configuration_t *consumer_configuration);

Expand Down
73 changes: 73 additions & 0 deletions pulsar-client-cpp/lib/AckGroupingTracker.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "AckGroupingTracker.h"

#include <cstdint>

#include <set>

#include "Commands.h"
#include "LogUtils.h"
#include "PulsarApi.pb.h"
#include "ClientConnection.h"
#include <pulsar/MessageId.h>

namespace pulsar {

DECLARE_LOG_OBJECT();

inline void sendAck(ClientConnectionPtr cnx, uint64_t consumerId, const MessageId& msgId,
proto::CommandAck_AckType ackType) {
proto::MessageIdData msgIdData;
msgIdData.set_ledgerid(msgId.ledgerId());
msgIdData.set_entryid(msgId.entryId());
auto cmd = Commands::newAck(consumerId, msgIdData, ackType, -1);
cnx->sendCommand(cmd);
LOG_DEBUG("ACK request is sent for message - [" << msgIdData.ledgerid() << ", " << msgIdData.entryid()
<< "]");
}

bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
const MessageId& msgId, proto::CommandAck_AckType ackType) {
auto cnx = connWeakPtr.lock();
if (cnx == nullptr) {
LOG_DEBUG("Connection is not ready, ACK failed for message - [" << msgId.ledgerId() << ", "
<< msgId.entryId() << "]");
return false;
}
sendAck(cnx, consumerId, msgId, ackType);
return true;
}

bool AckGroupingTracker::doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
const std::set<MessageId>& msgIds) {
auto cnx = connWeakPtr.lock();
if (cnx == nullptr) {
LOG_DEBUG("Connection is not ready, ACK failed.");
return false;
}

for (const auto& msgId : msgIds) {
sendAck(cnx, consumerId, msgId, proto::CommandAck::Individual);
}
return true;
}

} // namespace pulsar
108 changes: 108 additions & 0 deletions pulsar-client-cpp/lib/AckGroupingTracker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef LIB_ACKGROUPINGTRACKER_H_
#define LIB_ACKGROUPINGTRACKER_H_

#include <cstdint>

#include <set>
#include <memory>

#include "PulsarApi.pb.h"
#include "ClientConnection.h"
#include <pulsar/MessageId.h>

namespace pulsar {

/**
* @class AckGroupingTracker
* Default ACK grouping tracker, it actually neither tracks ACK requests nor sends them to brokers.
* It can be directly used by consumers for non-persistent topics.
*/
class AckGroupingTracker {
public:
AckGroupingTracker() = default;
virtual ~AckGroupingTracker() = default;

/**
* Since ACK requests are grouped and delayed, we need to do some best-effort duplicate check to
* discard messages that are being resent after a disconnection and for which the user has
* already sent an acknowledgement.
* @param[in] msgId message ID to be checked.
* @return true if given message ID is grouped, otherwise false. If using cumulative ACK and the
* given message ID has been ACKed in previous cumulative ACK, it also returns true;
*/
virtual bool isDuplicate(const MessageId& msgId) { return false; }

/**
* Adding message ID into ACK group for individual ACK.
* @param[in] msgId ID of the message to be ACKed.
*/
virtual void addAcknowledge(const MessageId& msgId) {}

/**
* Adding message ID into ACK group for cumulative ACK.
* @param[in] msgId ID of the message to be ACKed.
*/
virtual void addAcknowledgeCumulative(const MessageId& msgId) {}

/**
* Flush all the pending grouped ACKs (as flush() does), and stop period ACKs sending.
*/
virtual void close() {}

/**
* Flush all the pending grouped ACKs and send them to the broker.
*/
virtual void flush() {}

/**
* Flush all the pending grouped ACKs (as flush() does), and clean all records about ACKed
* messages, such as last cumulative ACKed message ID.
*/
virtual void flushAndClean() {}

protected:
/**
* Immediately send ACK request to broker.
* @param[in] connWeakPtr weak pointer of the client connection.
* @param[in] consumerId ID of the consumer that performs this ACK.
* @param[in] msgId message ID to be ACKed.
* @param[in] ackType ACK type, e.g. cumulative.
* @return true if the ACK is sent successfully, otherwise false.
*/
bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId, const MessageId& msgId,
proto::CommandAck_AckType ackType);

/**
* Immediately send a set of ACK requests one by one to the broker, it only supports individual
* ACK.
* @param[in] connWeakPtr weak pointer of the client connection.
* @param[in] consumerId ID of the consumer that performs this ACK.
* @param[in] msgIds message IDs to be ACKed.
* @return true if the ACK is sent successfully, otherwise false.
*/
bool doImmediateAck(ClientConnectionWeakPtr connWeakPtr, uint64_t consumerId,
const std::set<MessageId>& msgIds);
}; // class AckGroupingTracker

using AckGroupingTrackerScopedPtr = std::unique_ptr<AckGroupingTracker>;

} // namespace pulsar
#endif /* LIB_ACKGROUPINGTRACKER_H_ */
43 changes: 43 additions & 0 deletions pulsar-client-cpp/lib/AckGroupingTrackerDisabled.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "AckGroupingTrackerDisabled.h"

#include "HandlerBase.h"
#include "PulsarApi.pb.h"
#include <pulsar/MessageId.h>

namespace pulsar {

DECLARE_LOG_OBJECT();

AckGroupingTrackerDisabled::AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId)
: AckGroupingTracker(), handler_(handler), consumerId_(consumerId) {
LOG_INFO("ACK grouping is disabled.");
}

void AckGroupingTrackerDisabled::addAcknowledge(const MessageId& msgId) {
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, proto::CommandAck::Individual);
}

void AckGroupingTrackerDisabled::addAcknowledgeCumulative(const MessageId& msgId) {
this->doImmediateAck(this->handler_.getCnx(), this->consumerId_, msgId, proto::CommandAck::Cumulative);
}

} // namespace pulsar
58 changes: 58 additions & 0 deletions pulsar-client-cpp/lib/AckGroupingTrackerDisabled.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef LIB_ACKGROUPINGTRACKERDISABLED_H_
#define LIB_ACKGROUPINGTRACKERDISABLED_H_

#include <cstdint>

#include "HandlerBase.h"
#include <pulsar/MessageId.h>
#include "AckGroupingTracker.h"

namespace pulsar {

/**
* @class AckGroupingTrackerDisabled
* ACK grouping tracker that does not tracker or group ACK requests. The ACK requests are diretly
* sent to broker.
*/
class AckGroupingTrackerDisabled : public AckGroupingTracker {
public:
virtual ~AckGroupingTrackerDisabled() = default;

/**
* Constructing ACK grouping tracker for peresistent topics that disabled ACK grouping.
* @param[in] handler the connection handler.
* @param[in] consumerId consumer ID that this tracker belongs to.
*/
AckGroupingTrackerDisabled(HandlerBase& handler, uint64_t consumerId);

void addAcknowledge(const MessageId& msgId) override;
void addAcknowledgeCumulative(const MessageId& msgId) override;

private:
//! The connection handler.
HandlerBase& handler_;

//! ID of the consumer that this tracker belongs to.
uint64_t consumerId_;
}; // class AckGroupingTrackerDisabled

} // namespace pulsar
#endif /* LIB_ACKGROUPINGTRACKERDISABLED_H_ */
Loading

0 comments on commit 9e4f854

Please sign in to comment.