Skip to content

Commit

Permalink
Cpp client: add PatternMultiTopicsConsumerImpl to support regex subsc…
Browse files Browse the repository at this point in the history
…ribe (apache#2219)

In PR apache#1279 and apache#1298 we added regex based subscription. This is a catch up work to add `PatternMultiTopicsConsumerImpl` in cpp client.
  • Loading branch information
zhaijack authored and jiazhai committed Aug 4, 2018
1 parent 5c2e88c commit 93e192f
Show file tree
Hide file tree
Showing 21 changed files with 1,032 additions and 32 deletions.
16 changes: 16 additions & 0 deletions pulsar-client-cpp/include/pulsar/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ class Client {
void subscribeAsync(const std::string& topic, const std::string& consumerName,
const ConsumerConfiguration& conf, SubscribeCallback callback);

/**
* subscribe for multiple topics under the same namespace.
*/
Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
Consumer& consumer);
Result subscribe(const std::vector<std::string>& topics, const std::string& subscriptionName,
Expand All @@ -108,6 +111,19 @@ class Client {
void subscribeAsync(const std::vector<std::string>& topics, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback);

/**
* subscribe for multiple topics, which match given regexPattern, under the same namespace.
*/
Result subscribeWithRegex(const std::string& regexPattern, const std::string& consumerName,
Consumer& consumer);
Result subscribeWithRegex(const std::string& regexPattern, const std::string& consumerName,
const ConsumerConfiguration& conf, Consumer& consumer);

void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName,
SubscribeCallback callback);
void subscribeWithRegexAsync(const std::string& regexPattern, const std::string& consumerName,
const ConsumerConfiguration& conf, SubscribeCallback callback);

/**
* Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
* topic.
Expand Down
10 changes: 10 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,16 @@ class ConsumerConfiguration {
bool isReadCompacted() const;
void setReadCompacted(bool compacted);

/**
* Set the time duration in minutes, for which the PatternMultiTopicsConsumer will do a pattern auto
* discovery.
* The default value is 60 seconds. less than 0 will disable auto discovery.
*
* @param periodInSeconds period in seconds to do an auto discovery
*/
void setPatternAutoDiscoveryPeriod(int periodInSeconds);
int getPatternAutoDiscoveryPeriod() const;

friend class PulsarWrapper;

private:
Expand Down
42 changes: 42 additions & 0 deletions pulsar-client-cpp/lib/BinaryProtoLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,46 @@ uint64_t BinaryProtoLookupService::newRequestId() {
Lock lock(mutex_);
return ++requestIdGenerator_;
}

Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespaceAsync(
const NamespaceNamePtr& nsName) {
NamespaceTopicsPromisePtr promise = boost::make_shared<Promise<Result, NamespaceTopicsPtr>>();
if (!nsName) {
promise->setFailed(ResultInvalidTopicName);
return promise->getFuture();
}
std::string namespaceName = nsName->toString();
Future<Result, ClientConnectionWeakPtr> future = cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
future.addListener(boost::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest, this,
namespaceName, _1, _2, promise));
return promise->getFuture();
}

void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(ResultConnectError);
return;
}

ClientConnectionPtr conn = clientCnx.lock();
uint64_t requestId = newRequestId();
LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " nsName: " << nsName);

conn->newGetTopicsOfNamespace(nsName, requestId)
.addListener(
boost::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this, _1, _2, promise));
}

void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr,
NamespaceTopicsPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(ResultLookupError);
return;
}

promise->setValue(topicsPtr);
}

} // namespace pulsar
9 changes: 9 additions & 0 deletions pulsar-client-cpp/lib/BinaryProtoLookupService.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class BinaryProtoLookupService : public LookupService {

Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName);

Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName);

private:
boost::mutex mutex_;
uint64_t requestIdGenerator_;
Expand All @@ -61,6 +63,13 @@ class BinaryProtoLookupService : public LookupService {
const ClientConnectionWeakPtr& clientCnx,
LookupDataResultPromisePtr promise);

void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise);

void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr,
NamespaceTopicsPromisePtr promise);

uint64_t newRequestId();
};
typedef boost::shared_ptr<BinaryProtoLookupService> BinaryProtoLookupServicePtr;
Expand Down
24 changes: 24 additions & 0 deletions pulsar-client-cpp/lib/Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,30 @@ void Client::subscribeAsync(const std::vector<std::string>& topics, const std::s
impl_->subscribeAsync(topics, subscriptionName, conf, callback);
}

Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName,
Consumer& consumer) {
return subscribeWithRegex(regexPattern, subscriptionName, ConsumerConfiguration(), consumer);
}

Result Client::subscribeWithRegex(const std::string& regexPattern, const std::string& subscriptionName,
const ConsumerConfiguration& conf, Consumer& consumer) {
Promise<Result, Consumer> promise;
subscribeWithRegexAsync(regexPattern, subscriptionName, conf, WaitForCallbackValue<Consumer>(promise));
Future<Result, Consumer> future = promise.getFuture();

return future.get(consumer);
}

void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
SubscribeCallback callback) {
subscribeWithRegexAsync(regexPattern, subscriptionName, ConsumerConfiguration(), callback);
}

void Client::subscribeWithRegexAsync(const std::string& regexPattern, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, callback);
}

Result Client::createReader(const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf, Reader& reader) {
Promise<Result, Reader> promise;
Expand Down
86 changes: 79 additions & 7 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void ClientConnection::handlePulsarConnected(const CommandConnected& cmdConnecte
}

void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerStatsRequests) {
std::vector<Promise<Result, BrokerConsumerStatsImpl> > consumerStatsPromises;
std::vector<Promise<Result, BrokerConsumerStatsImpl>> consumerStatsPromises;
Lock lock(mutex_);

for (int i = 0; i < consumerStatsRequests.size(); i++) {
Expand Down Expand Up @@ -856,6 +856,7 @@ void ClientConnection::handleIncomingCommand() {
<< " -- req_id: " << error.request_id());

Lock lock(mutex_);

PendingRequestsMap::iterator it = pendingRequests_.find(error.request_id());
if (it != pendingRequests_.end()) {
PendingRequestData requestData = it->second;
Expand All @@ -865,19 +866,28 @@ void ClientConnection::handleIncomingCommand() {
requestData.promise.setFailed(getResult(error.error()));
requestData.timer->cancel();
} else {
PendingGetLastMessageIdRequestsMap::iterator it2 =
PendingGetLastMessageIdRequestsMap::iterator it =
pendingGetLastMessageIdRequests_.find(error.request_id());
if (it2 != pendingGetLastMessageIdRequests_.end()) {
Promise<Result, MessageId> getLastMessageIdPromise = it2->second;
pendingGetLastMessageIdRequests_.erase(it2);
if (it != pendingGetLastMessageIdRequests_.end()) {
Promise<Result, MessageId> getLastMessageIdPromise = it->second;
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();

getLastMessageIdPromise.setFailed(getResult(error.error()));
} else {
lock.unlock();
PendingGetNamespaceTopicsMap::iterator it =
pendingGetNamespaceTopicsRequests_.find(error.request_id());
if (it != pendingGetNamespaceTopicsRequests_.end()) {
Promise<Result, NamespaceTopicsPtr> getNamespaceTopicsPromise = it->second;
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();

getNamespaceTopicsPromise.setFailed(getResult(error.error()));
} else {
lock.unlock();
}
}
}

break;
}

Expand Down Expand Up @@ -978,6 +988,51 @@ void ClientConnection::handleIncomingCommand() {
break;
}

case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: {
const CommandGetTopicsOfNamespaceResponse& response =
incomingCmd_.gettopicsofnamespaceresponse();

LOG_DEBUG(cnxString_ << "Received GetTopicsOfNamespaceResponse from server. req_id: "
<< response.request_id() << " topicsSize" << response.topics_size());

Lock lock(mutex_);
PendingGetNamespaceTopicsMap::iterator it =
pendingGetNamespaceTopicsRequests_.find(response.request_id());

if (it != pendingGetNamespaceTopicsRequests_.end()) {
Promise<Result, NamespaceTopicsPtr> getTopicsPromise = it->second;
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();

int numTopics = response.topics_size();
std::set<std::string> topicSet;
// get all topics
for (int i = 0; i < numTopics; i++) {
// remove partition part
const std::string& topicName = response.topics(i);
int pos = topicName.find("-partition-");
std::string filteredName = topicName.substr(0, pos);

// filter duped topic name
if (topicSet.find(filteredName) == topicSet.end()) {
topicSet.insert(filteredName);
}
}

NamespaceTopicsPtr topicsPtr =
boost::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());

getTopicsPromise.setValue(topicsPtr);
} else {
lock.unlock();
LOG_WARN(
"GetTopicsOfNamespaceResponse command - Received unknown request id from "
"server: "
<< response.request_id());
}
break;
}

default: {
LOG_WARN(cnxString_ << "Received invalid message from server");
close();
Expand Down Expand Up @@ -1281,4 +1336,21 @@ Future<Result, MessageId> ClientConnection::newGetLastMessageId(uint64_t consume
return promise.getFuture();
}

Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(const std::string& nsName,
uint64_t requestId) {
Lock lock(mutex_);
Promise<Result, NamespaceTopicsPtr> promise;
if (isClosed()) {
lock.unlock();
LOG_ERROR(cnxString_ << "Client is not connected to the broker");
promise.setFailed(ResultNotConnected);
return promise.getFuture();
}

pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, promise));
lock.unlock();
sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId));
return promise.getFuture();
}

} // namespace pulsar
13 changes: 10 additions & 3 deletions pulsar-client-cpp/lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ struct OpSendMsg;

typedef std::pair<std::string, int64_t> ResponseData;

typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;

class ClientConnection : public boost::enable_shared_from_this<ClientConnection> {
enum State
{
Expand All @@ -81,7 +83,7 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>

public:
typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
typedef boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&> > TlsSocketPtr;
typedef boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>> TlsSocketPtr;
typedef boost::shared_ptr<ClientConnection> ConnectionPtr;
typedef boost::function<void(const boost::system::error_code&, ConnectionPtr)> ConnectionListener;
typedef std::vector<ConnectionListener>::iterator ListenerIterator;
Expand Down Expand Up @@ -144,6 +146,8 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>

Future<Result, MessageId> newGetLastMessageId(uint64_t consumerId, uint64_t requestId);

Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId);

private:
struct PendingRequestData {
Promise<Result, ResponseData> promise;
Expand Down Expand Up @@ -264,12 +268,15 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection>
typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap;
ConsumersMap consumers_;

typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl> > PendingConsumerStatsMap;
typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> PendingConsumerStatsMap;
PendingConsumerStatsMap pendingConsumerStatsMap_;

typedef std::map<long, Promise<Result, MessageId> > PendingGetLastMessageIdRequestsMap;
typedef std::map<long, Promise<Result, MessageId>> PendingGetLastMessageIdRequestsMap;
PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;

typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;

boost::mutex mutex_;
typedef boost::unique_lock<boost::mutex> Lock;

Expand Down
Loading

0 comments on commit 93e192f

Please sign in to comment.