Skip to content

Commit

Permalink
[C++] Ensure ExecutorServicePtr is not destroyed while the timer obje…
Browse files Browse the repository at this point in the history
…ct is alive (apache#6226)

* [C++] Ensure ExecutorServicePtr is not destroyed while the timer object is alive

* Fixed formatting

Co-authored-by: Sijie Guo <[email protected]>
  • Loading branch information
merlimat and sijie authored Feb 7, 2020
1 parent 54b39e6 commit d63b6c1
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 10 deletions.
3 changes: 1 addition & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
if (statsIntervalInSeconds) {
consumerStatsBasePtr_ = std::make_shared<ConsumerStatsImpl>(
consumerStr_, client->getIOExecutorProvider()->get()->createDeadlineTimer(),
statsIntervalInSeconds);
consumerStr_, client->getIOExecutorProvider()->get(), statsIntervalInSeconds);
} else {
consumerStatsBasePtr_ = std::make_shared<ConsumerStatsDisabled>();
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const

unsigned int statsIntervalInSeconds = client->getClientConfig().getStatsIntervalInSeconds();
if (statsIntervalInSeconds) {
producerStatsBasePtr_ = std::make_shared<ProducerStatsImpl>(
producerStr_, executor_->createDeadlineTimer(), statsIntervalInSeconds);
producerStatsBasePtr_ =
std::make_shared<ProducerStatsImpl>(producerStr_, executor_, statsIntervalInSeconds);
} else {
producerStatsBasePtr_ = std::make_shared<ProducerStatsDisabled>();
}
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/lib/stats/ConsumerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
namespace pulsar {
DECLARE_LOG_OBJECT();

ConsumerStatsImpl::ConsumerStatsImpl(std::string consumerStr, DeadlineTimerPtr timer,
ConsumerStatsImpl::ConsumerStatsImpl(std::string consumerStr, ExecutorServicePtr executor,
unsigned int statsIntervalInSeconds)
: consumerStr_(consumerStr),
timer_(timer),
executor_(executor),
timer_(executor_->createDeadlineTimer()),
statsIntervalInSeconds_(statsIntervalInSeconds),
totalNumBytesRecieved_(0),
numBytesRecieved_(0) {
Expand Down
4 changes: 3 additions & 1 deletion pulsar-client-cpp/lib/stats/ConsumerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
std::map<std::pair<Result, proto::CommandAck_AckType>, unsigned long> totalAckedMsgMap_;

std::string consumerStr_;

ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
std::mutex mutex_;
unsigned int statsIntervalInSeconds_;
Expand All @@ -46,7 +48,7 @@ class ConsumerStatsImpl : public ConsumerStatsBase {
friend class PulsarFriend;

public:
ConsumerStatsImpl(std::string, DeadlineTimerPtr, unsigned int);
ConsumerStatsImpl(std::string, ExecutorServicePtr, unsigned int);
ConsumerStatsImpl(const ConsumerStatsImpl& stats);
void flushAndReset(const boost::system::error_code&);
virtual void receivedMessage(Message&, Result);
Expand Down
5 changes: 3 additions & 2 deletions pulsar-client-cpp/lib/stats/ProducerStatsImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ std::string ProducerStatsImpl::latencyToString(const LatencyAccumulator& obj) {
return os.str();
}

ProducerStatsImpl::ProducerStatsImpl(std::string producerStr, DeadlineTimerPtr timer,
ProducerStatsImpl::ProducerStatsImpl(std::string producerStr, ExecutorServicePtr executor,
unsigned int statsIntervalInSeconds)
: numMsgsSent_(0),
numBytesSent_(0),
totalMsgsSent_(0),
totalBytesSent_(0),
timer_(timer),
executor_(executor),
timer_(executor->createDeadlineTimer()),
producerStr_(producerStr),
statsIntervalInSeconds_(statsIntervalInSeconds),
mutex_(),
Expand Down
4 changes: 3 additions & 1 deletion pulsar-client-cpp/lib/stats/ProducerStatsImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class ProducerStatsImpl : public std::enable_shared_from_this<ProducerStatsImpl>
LatencyAccumulator totalLatencyAccumulator_;

std::string producerStr_;

ExecutorServicePtr executor_;
DeadlineTimerPtr timer_;
std::mutex mutex_;
unsigned int statsIntervalInSeconds_;
Expand All @@ -72,7 +74,7 @@ class ProducerStatsImpl : public std::enable_shared_from_this<ProducerStatsImpl>
static std::string latencyToString(const LatencyAccumulator&);

public:
ProducerStatsImpl(std::string, DeadlineTimerPtr, unsigned int);
ProducerStatsImpl(std::string, ExecutorServicePtr, unsigned int);

ProducerStatsImpl(const ProducerStatsImpl& stats);

Expand Down

0 comments on commit d63b6c1

Please sign in to comment.