From 0acfe574e101d50faf87447296945d52e4672319 Mon Sep 17 00:00:00 2001 From: jai1 Date: Wed, 13 Sep 2017 15:30:00 -0700 Subject: [PATCH] Pulsar Clients: Added a mandatory stop to the Backoff logic (#747) --- .../broker/service/AbstractReplicator.java | 2 +- ...PersistentDispatcherMultipleConsumers.java | 2 +- ...sistentDispatcherSingleActiveConsumer.java | 2 +- .../persistent/PersistentReplicator.java | 2 +- pulsar-client-cpp/.gitignore | 1 + pulsar-client-cpp/lib/Backoff.cc | 28 +++- pulsar-client-cpp/lib/Backoff.h | 14 +- pulsar-client-cpp/lib/ConsumerImpl.cc | 3 +- pulsar-client-cpp/lib/HandlerBase.cc | 4 +- pulsar-client-cpp/lib/HandlerBase.h | 2 +- pulsar-client-cpp/lib/ProducerImpl.cc | 11 +- pulsar-client-cpp/tests/BackoffTest.cc | 127 +++++++++++++++-- pulsar-client-cpp/tests/PulsarFriend.h | 4 + .../apache/pulsar/client/impl/Backoff.java | 36 ++++- .../pulsar/client/impl/ConsumerBase.java | 2 +- .../pulsar/client/impl/HandlerBase.java | 4 +- .../pulsar/client/impl/ProducerBase.java | 3 +- .../pulsar/client/impl/BackoffTest.java | 128 +++++++++++++++++- 18 files changed, 333 insertions(+), 42 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 67c30216fd953..76ce20b5c9088 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -45,7 +45,7 @@ public abstract class AbstractReplicator { protected static final ProducerConfiguration producerConfiguration = new ProducerConfiguration() .setSendTimeout(0, TimeUnit.SECONDS).setBlockIfQueueFull(true); - protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES); + protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS); protected final String replicatorPrefix; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 5e70036f1546c..a595971c68192 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -72,7 +72,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMu private int totalAvailablePermits = 0; private int readBatchSize; - private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); private static final AtomicIntegerFieldUpdater TOTAL_UNACKED_MESSAGES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "totalUnackedMessages"); private volatile int totalUnackedMessages = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 1a6b65af7c78c..84a677e5f7503 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -52,7 +52,7 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp private static final int MaxReadBatchSize = 100; private int readBatchSize; - private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + private final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); private final ServiceConfiguration serviceConfig; public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 9b7821cbd0762..48bbf0975b7c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -80,7 +80,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat private int messageTTLInSeconds = 0; - private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES); + private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); private PersistentMessageExpiryMonitor expiryMonitor; // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore index 50b904c4749b6..c660cd10d7b82 100644 --- a/pulsar-client-cpp/.gitignore +++ b/pulsar-client-cpp/.gitignore @@ -44,6 +44,7 @@ lib*.so* /system-test/SystemTest # IDE generated files +.csettings .cproject .project .settings/ diff --git a/pulsar-client-cpp/lib/Backoff.cc b/pulsar-client-cpp/lib/Backoff.cc index 746f9d54f6c5a..aba88f5fb553c 100644 --- a/pulsar-client-cpp/lib/Backoff.cc +++ b/pulsar-client-cpp/lib/Backoff.cc @@ -17,24 +17,44 @@ * under the License. */ #include "Backoff.h" -#include namespace pulsar { -Backoff::Backoff(const TimeDuration& initial, const TimeDuration& max) +Backoff::Backoff(const TimeDuration& initial, const TimeDuration& max, const TimeDuration& mandatoryStop) : initial_(initial), max_(max), - next_(initial) { + next_(initial), + mandatoryStopMade_(false), + mandatoryStop_(mandatoryStop), + randomSeed_(time(NULL)) { } TimeDuration Backoff::next() { TimeDuration current = next_; next_ = std::min(next_ * 2, max_); - return current; + + // Check for mandatory stop + if (!mandatoryStopMade_) { + const boost::posix_time::ptime& now = boost::posix_time::microsec_clock::universal_time(); + TimeDuration timeElapsedSinceFirstBackoff = boost::posix_time::milliseconds(0); + if (initial_ == current) { + firstBackoffTime_ = now; + } else { + timeElapsedSinceFirstBackoff = now - firstBackoffTime_; + } + if (timeElapsedSinceFirstBackoff + current > mandatoryStop_) { + current = std::max(initial_, mandatoryStop_ - timeElapsedSinceFirstBackoff); + mandatoryStopMade_ = true; + } + } + // Add Randomness + current = current - (current * (rand_r(&randomSeed_) % 10) / 100); + return std::max(initial_, current); } void Backoff::reset() { next_ = initial_; + mandatoryStopMade_ = false; } } //pulsar - namespace end diff --git a/pulsar-client-cpp/lib/Backoff.h b/pulsar-client-cpp/lib/Backoff.h index 670d4df51f261..d06b84d88a4c3 100644 --- a/pulsar-client-cpp/lib/Backoff.h +++ b/pulsar-client-cpp/lib/Backoff.h @@ -19,6 +19,9 @@ #ifndef _PULSAR_BACKOFF_HEADER_ #define _PULSAR_BACKOFF_HEADER_ #include +#include /* srand, rand */ +#include +#include /* time */ #pragma GCC visibility push(default) @@ -28,13 +31,18 @@ typedef boost::posix_time::time_duration TimeDuration; class Backoff { public: - Backoff(const TimeDuration& intial, const TimeDuration& max); + Backoff(const TimeDuration&, const TimeDuration&, const TimeDuration&); TimeDuration next(); void reset(); private: - TimeDuration initial_; - TimeDuration max_; + const TimeDuration initial_; + const TimeDuration max_; TimeDuration next_; + TimeDuration mandatoryStop_; + boost::posix_time::ptime firstBackoffTime_; + bool mandatoryStopMade_; + unsigned int randomSeed_; + friend class PulsarFriend; }; } diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 57febfb8b6f59..38a236311bff8 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -28,7 +28,6 @@ #include "DestinationName.h" #include -using namespace pulsar; namespace pulsar { DECLARE_LOG_OBJECT() @@ -39,7 +38,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */, Commands::SubscriptionMode subscriptionMode, Optional startMessageId) - : HandlerBase(client, topic), + : HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))), waitingForZeroQueueSizeMessage(false), config_(conf), subscription_(subscription), diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc index 62347eb90e4f6..a0a0686478bc9 100644 --- a/pulsar-client-cpp/lib/HandlerBase.cc +++ b/pulsar-client-cpp/lib/HandlerBase.cc @@ -27,7 +27,7 @@ DECLARE_LOG_OBJECT() namespace pulsar { -HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic) +HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff) : client_(client), topic_(topic), connection_(), @@ -35,7 +35,7 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic) creationTimestamp_(now()), operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())), state_(Pending), - backoff_(milliseconds(100), seconds(60)), + backoff_(backoff), timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) { } diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h index 4e2846571957d..70e2c6262659a 100644 --- a/pulsar-client-cpp/lib/HandlerBase.h +++ b/pulsar-client-cpp/lib/HandlerBase.h @@ -42,7 +42,7 @@ typedef boost::shared_ptr HandlerBasePtr; class HandlerBase { public: - HandlerBase(const ClientImplPtr& client, const std::string& topic); + HandlerBase(const ClientImplPtr&, const std::string&, const Backoff&); virtual ~HandlerBase(); diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index ae077319aaeff..eb15e648108d5 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -26,7 +26,6 @@ #include #include -using namespace pulsar; namespace pulsar { DECLARE_LOG_OBJECT() @@ -48,9 +47,13 @@ OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& ms } ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, - const ProducerConfiguration& producerConfiguration) - : HandlerBase(client, topic), - conf_(producerConfiguration), + const ProducerConfiguration& conf) + : HandlerBase( + client, + topic, + Backoff(milliseconds(100), seconds(60), + milliseconds(std::max(100, conf.getSendTimeout() - 100)))), + conf_(conf), executor_(client->getIOExecutorProvider()->get()), pendingMessagesQueue_(conf_.getMaxPendingMessages()), producerStr_("[" + topic_ + ", " + producerName_ + "] "), diff --git a/pulsar-client-cpp/tests/BackoffTest.cc b/pulsar-client-cpp/tests/BackoffTest.cc index 003dcd50bb4ae..a0b89603fa205 100644 --- a/pulsar-client-cpp/tests/BackoffTest.cc +++ b/pulsar-client-cpp/tests/BackoffTest.cc @@ -18,22 +18,131 @@ */ #include #include "Backoff.h" +#include "PulsarFriend.h" using namespace pulsar; using boost::posix_time::milliseconds; using boost::posix_time::seconds; + +static bool checkExactAndDecrementTimer(Backoff& backoff, const unsigned int& t2) { + const unsigned int& t1 = backoff.next().total_milliseconds(); + boost::posix_time::ptime& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); + firstBackOffTime -= milliseconds(t2); + return t1 == t2; +} + +static bool withinTenPercentAndDecrementTimer(Backoff& backoff, const unsigned int& t2) { + const unsigned int& t1 = backoff.next().total_milliseconds(); + boost::posix_time::ptime& firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); + firstBackOffTime -= milliseconds(t2); + return (t1 >= t2 * 0.9 && t1 <= t2); +} + +TEST(BackoffTest, mandatoryStopTestNegativeTest) { + Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900)); + ASSERT_EQ(backoff.next().total_milliseconds(), 100); + backoff.next().total_milliseconds(); // 200 + backoff.next().total_milliseconds(); // 400 + backoff.next().total_milliseconds(); // 800 + ASSERT_FALSE(withinTenPercentAndDecrementTimer(backoff, 400)); +} + +TEST(BackoffTest, firstBackoffTimerTest) { + Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900)); + ASSERT_EQ(backoff.next().total_milliseconds(), 100); + boost::posix_time::ptime firstBackOffTime = PulsarFriend::getFirstBackoffTime(backoff); + usleep(300 * 1000); + TimeDuration diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) - firstBackOffTime; + ASSERT_EQ(diffBackOffTime, milliseconds(0)); // no change since reset not called + + backoff.reset(); + ASSERT_EQ(backoff.next().total_milliseconds(), 100); + diffBackOffTime = PulsarFriend::getFirstBackoffTime(backoff) - firstBackOffTime; + ASSERT_TRUE(diffBackOffTime >= milliseconds(300) && diffBackOffTime < milliseconds(310)); +} + + TEST(BackoffTest, basicTest) { - Backoff backoff(milliseconds(5), seconds(60)); - ASSERT_EQ(backoff.next().total_milliseconds(), 5); - ASSERT_EQ(backoff.next().total_milliseconds(), 10); + Backoff backoff(milliseconds(5), seconds(60), seconds(60)); + ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 5)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 10)); + backoff.reset(); - ASSERT_EQ(backoff.next().total_milliseconds(), 5); + ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 5)); } TEST(BackoffTest, maxTest) { - Backoff backoff(milliseconds(5), milliseconds(20)); - ASSERT_EQ(backoff.next().total_milliseconds(), 5); - ASSERT_EQ(backoff.next().total_milliseconds(), 10); - ASSERT_EQ(backoff.next().total_milliseconds(), 20); - ASSERT_EQ(backoff.next().total_milliseconds(), 20); + Backoff backoff(milliseconds(5), milliseconds(20), milliseconds(20)); + ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 5)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 10)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 5)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 20)); } + +TEST(BackoffTest, mandatoryStopTest) { + Backoff backoff(milliseconds(100), seconds(60), milliseconds(1900)); + ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800)); + // would have been 1600 w/o the mandatory stop + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 3200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 6400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 12800)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 25600)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 51200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000)); + + backoff.reset(); + ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800)); + // would have been 1600 w/o the mandatory stop + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400)); + + backoff.reset(); + ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800)); + + backoff.reset(); + ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800)); +} + +TEST(BackoffTest, ignoringMandatoryStopTest) { + Backoff backoff(milliseconds(100), seconds(60), milliseconds(0)); + ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 1600)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 3200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 6400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 12800)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 25600)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 51200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000)); + + backoff.reset(); + ASSERT_TRUE(checkExactAndDecrementTimer(backoff, 100)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 800)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 1600)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 3200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 6400)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 12800)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 25600)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 51200)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000)); + ASSERT_TRUE(withinTenPercentAndDecrementTimer(backoff, 60000)); +} + diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index 749aeae497b93..2247126f09a5a 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -62,5 +62,9 @@ class PulsarFriend { static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; } + + static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) { + return backoff.firstBackoffTime_; + } }; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java index 119c62b12893f..9f796800a639e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/Backoff.java @@ -21,19 +21,25 @@ import java.util.Random; import java.util.concurrent.TimeUnit; +// All variables are in TimeUnit millis by default public class Backoff { private static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100); private static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30); private final long initial; private final long max; private long next; + private long mandatoryStop; + long firstBackoffTimeInMillis; + private boolean mandatoryStopMade = false; private static final Random random = new Random(); - public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax) { + public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop, + TimeUnit unitMandatoryStop) { this.initial = unitInitial.toMillis(initial); this.max = unitMax.toMillis(max); this.next = this.initial; + this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop); } public long next() { @@ -41,10 +47,29 @@ public long next() { if (current < max) { this.next = Math.min(this.next * 2, this.max); } - - // Randomly increase the timeout up to 25% to avoid simultaneous retries - current += random.nextInt((int) current / 4); - return current; + + // Check for mandatory stop + if (!mandatoryStopMade) { + long now = System.currentTimeMillis(); + long timeElapsedSinceFirstBackoff = 0; + if (initial == current) { + firstBackoffTimeInMillis = now; + } else { + timeElapsedSinceFirstBackoff = now - firstBackoffTimeInMillis; + } + + if (timeElapsedSinceFirstBackoff + current > mandatoryStop) { + current = Math.max(initial, mandatoryStop - timeElapsedSinceFirstBackoff); + mandatoryStopMade = true; + } + } + + // Randomly decrease the timeout up to 10% to avoid simultaneous retries + // If current < 10 then current/10 < 1 and we get an exception from Random saying "Bound must be positive" + if (current > 10) { + current -= random.nextInt((int) current / 10); + } + return Math.max(initial, current); } public void reduceToHalf() { @@ -55,6 +80,7 @@ public void reduceToHalf() { public void reset() { this.next = this.initial; + this.mandatoryStopMade = false; } public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 0e09d4f368a56..c736ac632f937 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -55,7 +55,7 @@ enum ConsumerType { protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture subscribeFuture) { - super(client, topic); + super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0 , TimeUnit.MILLISECONDS)); this.maxReceiverQueueSize = receiverQueueSize; this.subscription = subscription; this.conf = conf; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java index e238922f6ca72..5cff0f4546dd7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerBase.java @@ -51,10 +51,10 @@ enum State { Failed // Handler is failed }; - public HandlerBase(PulsarClientImpl client, String topic) { + public HandlerBase(PulsarClientImpl client, String topic, Backoff backoff) { this.client = client; this.topic = topic; - this.backoff = new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS); + this.backoff = backoff; STATE_UPDATER.set(this, State.Uninitialized); CLIENT_CNX_UPDATER.set(this, null); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java index 28c29d0e279b6..bc2ea5af8af81 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; @@ -35,7 +36,7 @@ public abstract class ProducerBase extends HandlerBase implements Producer { protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfiguration conf, CompletableFuture producerCreatedFuture) { - super(client, topic); + super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)); this.producerCreatedFuture = producerCreatedFuture; this.conf = conf; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java index 0fa244cca44f5..0463954b6a32a 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BackoffTest.java @@ -18,24 +18,144 @@ */ package org.apache.pulsar.client.impl; +import static org.testng.Assert.*; + import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.impl.Backoff; -import org.testng.Assert; import org.testng.annotations.Test; public class BackoffTest { + boolean withinTenPercentAndDecrementTimer(Backoff backoff, long t2) { + long t1 = backoff.next(); + backoff.firstBackoffTimeInMillis -= t2; + return (t1 >= t2 * 0.9 && t1 <= t2); + } + boolean checkExactAndDecrementTimer(Backoff backoff, long t2) { + long t1 = backoff.next(); + backoff.firstBackoffTimeInMillis -= t2; + return t1 == t2; + } @Test public void shouldBackoffTest() { long currentTimestamp = System.nanoTime(); - Backoff testBackoff = new Backoff(currentTimestamp, TimeUnit.NANOSECONDS, 100, TimeUnit.MICROSECONDS); + Backoff testBackoff = new Backoff(currentTimestamp, TimeUnit.NANOSECONDS, 100, TimeUnit.MICROSECONDS, 0, + TimeUnit.NANOSECONDS); // gives false - Assert.assertTrue(!testBackoff.shouldBackoff(0L, TimeUnit.NANOSECONDS, 0)); + assertTrue(!testBackoff.shouldBackoff(0L, TimeUnit.NANOSECONDS, 0)); currentTimestamp = System.nanoTime(); // gives true - Assert.assertTrue(testBackoff.shouldBackoff(currentTimestamp, TimeUnit.NANOSECONDS, 100)); + assertTrue(testBackoff.shouldBackoff(currentTimestamp, TimeUnit.NANOSECONDS, 100)); + } + @Test + public void mandatoryStopTestNegativeTest() { + Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 1900, TimeUnit.MILLISECONDS); + assertEquals(backoff.next(), 100); + backoff.next(); // 200 + backoff.next(); // 400 + backoff.next(); // 800 + assertFalse(withinTenPercentAndDecrementTimer(backoff, 400)); + } + + @Test + public void firstBackoffTimerTest() throws InterruptedException { + Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 1900, TimeUnit.MILLISECONDS); + assertEquals(backoff.next(), 100); + long firstBackOffTime = backoff.firstBackoffTimeInMillis; + Thread.sleep(300); + long diffBackOffTime = backoff.firstBackoffTimeInMillis - firstBackOffTime; + assertEquals(diffBackOffTime, 0); + + backoff.reset(); + assertEquals(backoff.next(), 100); + diffBackOffTime = backoff.firstBackoffTimeInMillis - firstBackOffTime; + assertTrue(diffBackOffTime >= 300 && diffBackOffTime < 310); + } + + @Test + public void basicTest() { + Backoff backoff = new Backoff(5, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 60, TimeUnit.SECONDS); + assertTrue(checkExactAndDecrementTimer(backoff, 5)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 10)); + backoff.reset(); + assertTrue(checkExactAndDecrementTimer(backoff, 5)); } + @Test + public void maxTest() { + Backoff backoff = new Backoff(5, TimeUnit.MILLISECONDS, 20, TimeUnit.MILLISECONDS, 20, TimeUnit.MILLISECONDS); + assertTrue(checkExactAndDecrementTimer(backoff, 5)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 10)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 5)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 20)); + } + + @Test + public void mandatoryStopTest() { + Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 1900, TimeUnit.MILLISECONDS); + assertTrue(checkExactAndDecrementTimer(backoff, 100)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + // would have been 1600 w/o the mandatory stop + assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 3200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 6400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 12800)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 25600)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 51200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + backoff.reset(); + assertTrue(checkExactAndDecrementTimer(backoff, 100)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + // would have been 1600 w/o the mandatory stop + assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + + backoff.reset(); + assertTrue(checkExactAndDecrementTimer(backoff, 100)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + + backoff.reset(); + assertTrue(checkExactAndDecrementTimer(backoff, 100)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + } + + public void ignoringMandatoryStopTest() { + Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS); + assertTrue(checkExactAndDecrementTimer(backoff, 100)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 1600)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 3200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 6400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 12800)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 25600)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 51200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + + backoff.reset(); + assertTrue(checkExactAndDecrementTimer(backoff, 100)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 800)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 1600)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 3200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 6400)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 12800)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 25600)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 51200)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + assertTrue(withinTenPercentAndDecrementTimer(backoff, 60000)); + } }