Skip to content

Commit

Permalink
[Issue 9204][Python/C++]Expose replicateSubscriptionState setting for…
Browse files Browse the repository at this point in the history
… python/c++ consumer (apache#10243)

Fixes apache#9204

### Motivation

Currently, in the python and C++ client, the "replicateSubscriptionState" feature can no be enabled in the consumer creation.

### Modifications

Expose the "replicateSubscriptionState" setting in the consumer's construct function.
  • Loading branch information
RobertIndie authored May 22, 2021
1 parent 2bd614a commit 67adf0a
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 7 deletions.
13 changes: 13 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,19 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
InitialPosition getSubscriptionInitialPosition() const;

/**
* Set whether the subscription status should be replicated.
* The default value is `false`.
*
* @param replicateSubscriptionState whether the subscription status should be replicated
*/
void setReplicateSubscriptionStateEnabled(bool enabled);

/**
* @return whether the subscription status should be replicated
*/
bool isReplicateSubscriptionStateEnabled() const;

/**
* Check whether the message has a specific property attached.
*
Expand Down
3 changes: 2 additions & 1 deletion pulsar-client-cpp/lib/Commands.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo,
CommandSubscribe_InitialPosition subscriptionInitialPosition,
KeySharedPolicy keySharedPolicy) {
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy) {
BaseCommand cmd;
cmd.set_type(BaseCommand::SUBSCRIBE);
CommandSubscribe* subscribe = cmd.mutable_subscribe();
Expand All @@ -269,6 +269,7 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string&
subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
subscribe->set_read_compacted(readCompacted);
subscribe->set_initialposition(subscriptionInitialPosition);
subscribe->set_replicate_subscription_state(replicateSubscriptionState);

if (isBuiltInSchema(schemaInfo.getSchemaType())) {
subscribe->set_allocated_schema(getSchema(schemaInfo));
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/Commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class Commands {
bool readCompacted, const std::map<std::string, std::string>& metadata,
const SchemaInfo& schemaInfo,
proto::CommandSubscribe_InitialPosition subscriptionInitialPosition,
KeySharedPolicy keySharedPolicy);
bool replicateSubscriptionState, KeySharedPolicy keySharedPolicy);

static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId);

Expand Down
8 changes: 8 additions & 0 deletions pulsar-client-cpp/lib/ConsumerConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) {

int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return impl_->patternAutoDiscoveryPeriod; }

void ConsumerConfiguration::setReplicateSubscriptionStateEnabled(bool enabled) {
impl_->replicateSubscriptionStateEnabled = enabled;
}

bool ConsumerConfiguration::isReplicateSubscriptionStateEnabled() const {
return impl_->replicateSubscriptionStateEnabled;
}

bool ConsumerConfiguration::hasProperty(const std::string& name) const {
const std::map<std::string, std::string>& m = impl_->properties;
return m.find(name) != m.end();
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct ConsumerConfigurationImpl {
bool readCompacted;
InitialPosition subscriptionInitialPosition;
int patternAutoDiscoveryPeriod;
bool replicateSubscriptionStateEnabled;
std::map<std::string, std::string> properties;
KeySharedPolicy keySharedPolicy;

Expand Down
8 changes: 4 additions & 4 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) {

ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
SharedBuffer cmd =
Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_,
subscriptionMode_, startMessageId_, readCompacted_, config_.getProperties(),
config_.getSchema(), getInitialPosition(), config_.getKeySharedPolicy());
SharedBuffer cmd = Commands::newSubscribe(
topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, subscriptionMode_,
startMessageId_, readCompacted_, config_.getProperties(), config_.getSchema(), getInitialPosition(),
config_.isReplicateSubscriptionStateEnabled(), config_.getKeySharedPolicy());
cnx->sendRequestWithId(cmd, requestId)
.addListener(
std::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, std::placeholders::_1));
Expand Down
8 changes: 7 additions & 1 deletion pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,8 @@ def subscribe(self, topic, subscription_name,
properties=None,
pattern_auto_discovery_period=60,
initial_position=InitialPosition.Latest,
crypto_key_reader=None
crypto_key_reader=None,
replicate_subscription_state_enabled=False
):
"""
Subscribe to the given topic and subscription combination.
Expand Down Expand Up @@ -675,6 +676,9 @@ def my_listener(consumer, message):
* crypto_key_reader:
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
* replicate_subscription_state_enabled:
Set whether the subscription status should be replicated.
Default: `False`.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
Expand Down Expand Up @@ -716,6 +720,8 @@ def my_listener(consumer, message):
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)

conf.replicate_subscription_state_enabled(replicate_subscription_state_enabled)

c = Consumer()
if isinstance(topic, str):
# Single topic
Expand Down
4 changes: 4 additions & 0 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def test_consumer_config(self):
conf.consumer_name("my-name")
self.assertEqual(conf.consumer_name(), "my-name")

self.assertEqual(conf.replicate_subscription_state_enabled(), False)
conf.replicate_subscription_state_enabled(True)
self.assertEqual(conf.replicate_subscription_state_enabled(), True)

def test_client_logger(self):
logger = logging.getLogger("pulsar")
Client(self.serviceUrl, logger=logger)
Expand Down
2 changes: 2 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ void export_config() {
.def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition)
.def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition)
.def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>())
.def("replicate_subscription_state_enabled", &ConsumerConfiguration::setReplicateSubscriptionStateEnabled)
.def("replicate_subscription_state_enabled", &ConsumerConfiguration::isReplicateSubscriptionStateEnabled)
;

class_<ReaderConfiguration>("ReaderConfiguration")
Expand Down

0 comments on commit 67adf0a

Please sign in to comment.