Skip to content

Commit

Permalink
Pulsar Clients: Added a mandatory stop to the Backoff logic (apache#747)
Browse files Browse the repository at this point in the history
  • Loading branch information
jai1 authored and merlimat committed Sep 13, 2017
1 parent a8abd98 commit 0acfe57
Show file tree
Hide file tree
Showing 18 changed files with 333 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class AbstractReplicator {
protected static final ProducerConfiguration producerConfiguration = new ProducerConfiguration()
.setSendTimeout(0, TimeUnit.SECONDS).setBlockIfQueueFull(true);

protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES);
protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS);

protected final String replicatorPrefix;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu

private int totalAvailablePermits = 0;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
private static final AtomicIntegerFieldUpdater<PersistentDispatcherMultipleConsumers> TOTAL_UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages");
private volatile int totalUnackedMessages = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp

private static final int MaxReadBatchSize = 100;
private int readBatchSize;
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
private final ServiceConfiguration serviceConfig;

public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat

private int messageTTLInSeconds = 0;

private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES);
private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);

private PersistentMessageExpiryMonitor expiryMonitor;
// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ lib*.so*
/system-test/SystemTest

# IDE generated files
.csettings
.cproject
.project
.settings/
Expand Down
28 changes: 24 additions & 4 deletions pulsar-client-cpp/lib/Backoff.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,44 @@
* under the License.
*/
#include "Backoff.h"
#include <algorithm>

namespace pulsar {

Backoff::Backoff(const TimeDuration& initial, const TimeDuration& max)
Backoff::Backoff(const TimeDuration& initial, const TimeDuration& max, const TimeDuration& mandatoryStop)
: initial_(initial),
max_(max),
next_(initial) {
next_(initial),
mandatoryStopMade_(false),
mandatoryStop_(mandatoryStop),
randomSeed_(time(NULL)) {
}

TimeDuration Backoff::next() {
TimeDuration current = next_;
next_ = std::min(next_ * 2, max_);
return current;

// Check for mandatory stop
if (!mandatoryStopMade_) {
const boost::posix_time::ptime& now = boost::posix_time::microsec_clock::universal_time();
TimeDuration timeElapsedSinceFirstBackoff = boost::posix_time::milliseconds(0);
if (initial_ == current) {
firstBackoffTime_ = now;
} else {
timeElapsedSinceFirstBackoff = now - firstBackoffTime_;
}
if (timeElapsedSinceFirstBackoff + current > mandatoryStop_) {
current = std::max(initial_, mandatoryStop_ - timeElapsedSinceFirstBackoff);
mandatoryStopMade_ = true;
}
}
// Add Randomness
current = current - (current * (rand_r(&randomSeed_) % 10) / 100);
return std::max(initial_, current);
}

void Backoff::reset() {
next_ = initial_;
mandatoryStopMade_ = false;
}

} //pulsar - namespace end
14 changes: 11 additions & 3 deletions pulsar-client-cpp/lib/Backoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
#ifndef _PULSAR_BACKOFF_HEADER_
#define _PULSAR_BACKOFF_HEADER_
#include <boost/date_time/posix_time/posix_time.hpp>
#include <stdlib.h> /* srand, rand */
#include <algorithm>
#include <time.h> /* time */

#pragma GCC visibility push(default)

Expand All @@ -28,13 +31,18 @@ typedef boost::posix_time::time_duration TimeDuration;

class Backoff {
public:
Backoff(const TimeDuration& intial, const TimeDuration& max);
Backoff(const TimeDuration&, const TimeDuration&, const TimeDuration&);
TimeDuration next();
void reset();
private:
TimeDuration initial_;
TimeDuration max_;
const TimeDuration initial_;
const TimeDuration max_;
TimeDuration next_;
TimeDuration mandatoryStop_;
boost::posix_time::ptime firstBackoffTime_;
bool mandatoryStopMade_;
unsigned int randomSeed_;
friend class PulsarFriend;
};
}

Expand Down
3 changes: 1 addition & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "DestinationName.h"
#include <algorithm>

using namespace pulsar;
namespace pulsar {

DECLARE_LOG_OBJECT()
Expand All @@ -39,7 +38,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic,
const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */,
Commands::SubscriptionMode subscriptionMode,
Optional<BatchMessageId> startMessageId)
: HandlerBase(client, topic),
: HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))),
waitingForZeroQueueSizeMessage(false),
config_(conf),
subscription_(subscription),
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/HandlerBase.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ DECLARE_LOG_OBJECT()

namespace pulsar {

HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic)
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff)
: client_(client),
topic_(topic),
connection_(),
mutex_(),
creationTimestamp_(now()),
operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
state_(Pending),
backoff_(milliseconds(100), seconds(60)),
backoff_(backoff),
timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/HandlerBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ typedef boost::shared_ptr<HandlerBase> HandlerBasePtr;
class HandlerBase {

public:
HandlerBase(const ClientImplPtr& client, const std::string& topic);
HandlerBase(const ClientImplPtr&, const std::string&, const Backoff&);

virtual ~HandlerBase();

Expand Down
11 changes: 7 additions & 4 deletions pulsar-client-cpp/lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include <boost/bind.hpp>
#include <boost/date_time/local_time/local_time.hpp>

using namespace pulsar;
namespace pulsar {
DECLARE_LOG_OBJECT()

Expand All @@ -48,9 +47,13 @@ OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& ms
}

ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic,
const ProducerConfiguration& producerConfiguration)
: HandlerBase(client, topic),
conf_(producerConfiguration),
const ProducerConfiguration& conf)
: HandlerBase(
client,
topic,
Backoff(milliseconds(100), seconds(60),
milliseconds(std::max(100, conf.getSendTimeout() - 100)))),
conf_(conf),
executor_(client->getIOExecutorProvider()->get()),
pendingMessagesQueue_(conf_.getMaxPendingMessages()),
producerStr_("[" + topic_ + ", " + producerName_ + "] "),
Expand Down
127 changes: 118 additions & 9 deletions pulsar-client-cpp/tests/BackoffTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,131 @@
*/
#include <gtest/gtest.h>
#include "Backoff.h"
#include "PulsarFriend.h"

using namespace pulsar;
using boost::posix_time::milliseconds;
using boost::posix_time::seconds;

static bool checkExactAndDecrementTimer(Backoff& backoff, const unsigned int& t2) {
const unsigned int& t1 = backoff.next().total_milliseconds();
boost::posix_time::ptime& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff);
firstBackOffTime -= milliseconds(t2);
return t1 == t2;
}

static bool withinTenPercentAndDecrementTimer(Backoff& backoff, const unsigned int& t2) {
const unsigned int& t1 = backoff.next().total_milliseconds();
boost::posix_time::ptime& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff);
firstBackOffTime -= milliseconds(t2);
return (t1 >= t2 * 0.9 && t1 <= t2);
}

TEST(BackoffTest, mandatoryStopTestNegativeTest) {
Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900));
ASSERT_EQ(backoff.next().total_milliseconds(), 100);
backoff.next().total_milliseconds(); // 200
backoff.next().total_milliseconds(); // 400
backoff.next().total_milliseconds(); // 800
ASSERT_FALSE(withinTenPercentAndDecrementTimer(backoff, 400));
}

TEST(BackoffTest, firstBackoffTimerTest) {
Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900));
ASSERT_EQ(backoff.next().total_milliseconds(), 100);
boost::posix_time::ptime firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff);
usleep(300 * 1000);
TimeDuration diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) - firstBackOffTime;
ASSERT_EQ(diffBackOffTime, milliseconds(0)); // no change since reset not called

backoff.reset();
ASSERT_EQ(backoff.next().total_milliseconds(), 100);
diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) - firstBackOffTime;
ASSERT_TRUE(diffBackOffTime >= milliseconds(300) && diffBackOffTime < milliseconds(310));
}


TEST(BackoffTest, basicTest) {
Backoff backoff(milliseconds(5), seconds(60));
ASSERT_EQ(backoff.next().total_milliseconds(), 5);
ASSERT_EQ(backoff.next().total_milliseconds(), 10);
Backoff backoff(milliseconds(5), seconds(60), seconds(60));
ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 5));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 10));

backoff.reset();
ASSERT_EQ(backoff.next().total_milliseconds(), 5);
ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 5));
}

TEST(BackoffTest, maxTest) {
Backoff backoff(milliseconds(5), milliseconds(20));
ASSERT_EQ(backoff.next().total_milliseconds(), 5);
ASSERT_EQ(backoff.next().total_milliseconds(), 10);
ASSERT_EQ(backoff.next().total_milliseconds(), 20);
ASSERT_EQ(backoff.next().total_milliseconds(), 20);
Backoff backoff(milliseconds(5), milliseconds(20), milliseconds(20));
ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 5));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 10));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 5));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 20));
}

TEST(BackoffTest, mandatoryStopTest) {
Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900));
ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800));
// would have been 1600 w/o the mandatory stop
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 3200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 6400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 12800));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 25600));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 51200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000));

backoff.reset();
ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800));
// would have been 1600 w/o the mandatory stop
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400));

backoff.reset();
ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800));

backoff.reset();
ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800));
}

TEST(BackoffTest, ignoringMandatoryStopTest) {
Backoff backoff(milliseconds(100), seconds(60), milliseconds(0));
ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 1600));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 3200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 6400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 12800));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 25600));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 51200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000));

backoff.reset();
ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 1600));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 3200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 6400));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 12800));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 25600));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 51200));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000));
ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000));
}

4 changes: 4 additions & 0 deletions pulsar-client-cpp/tests/PulsarFriend.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,9 @@ class PulsarFriend {
static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) {
return handler.connection_;
}

static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {
return backoff.firstBackoffTime_;
}
};
}
Loading

0 comments on commit 0acfe57

Please sign in to comment.