Skip to content

Commit

Permalink
[Issue-2122] [pulsar-client] Adding configuration for backoff strategy (
Browse files Browse the repository at this point in the history
apache#3848)


Fixes apache#2122 

### Motivation

Current backoff strategy is set by default and is too aggressive. What we should do is allow it to be configurable by the user.

### Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not sure)
  • Loading branch information
ConcurrencyPractitioner authored and sijie committed Apr 2, 2019
1 parent 8adff85 commit bdfc098
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
Schema.BYTES, null);
Schema.BYTES, null,
client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos());
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,26 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
* @return the client builder instance
*/
ClientBuilder connectionTimeout(int duration, TimeUnit unit);

/**
* Set the duration of time for a backoff interval.
*
* @param duration
* the duration of the interval
* @param unit
* the time unit in which the duration is defined
* @return the client builder instance
*/
ClientBuilder startingBackoffInterval(long duration, TimeUnit unit);

/**
* Set the maximum duration of time for a backoff interval.
*
* @param duration
* the duration of the interval
* @param unit
* the time unit in which the duration is defined
* @return the client builder instance
*/
ClientBuilder maxBackoffInterval(long duration, TimeUnit unit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@

// All variables are in TimeUnit millis by default
public class Backoff {
private static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100);
private static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30);
public static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100);
public static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30);
private final long backoffIntervalNanos;
private final long maxBackoffIntervalNanos;
private final long initial;
private final long max;
private final Clock clock;
Expand All @@ -40,19 +42,33 @@ public class Backoff {

@VisibleForTesting
Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
TimeUnit unitMandatoryStop, Clock clock) {
TimeUnit unitMandatoryStop, Clock clock, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
this.initial = unitInitial.toMillis(initial);
this.max = unitMax.toMillis(max);
this.next = this.initial;
this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop);
this.clock = clock;
this.backoffIntervalNanos = backoffIntervalNanos;
this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
}

@VisibleForTesting
Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
TimeUnit unitMandatoryStop, Clock clock) {
this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock,
Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
TimeUnit unitMandatoryStop) {
TimeUnit unitMandatoryStop) {
this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone());
}

public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop,
TimeUnit unitMandatoryStop, long backoffIntervalMs, long maxBackoffIntervalMs) {
this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone(),
backoffIntervalMs, maxBackoffIntervalMs);
}

public long next() {
long current = this.next;
if (current < max) {
Expand Down Expand Up @@ -99,19 +115,39 @@ long getFirstBackoffTimeInMillis() {
return firstBackoffTimeInMillis;
}

public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
@VisibleForTesting
long backoffIntervalNanos() {
return backoffIntervalNanos;
}

@VisibleForTesting
long maxBackoffIntervalNanos() {
return maxBackoffIntervalNanos;
}

public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts,
long defaultInterval, long maxBackoffInterval) {
long initialTimestampInNano = unitInitial.toNanos(initialTimestamp);
long currentTime = System.nanoTime();
long interval = DEFAULT_INTERVAL_IN_NANOSECONDS;
long interval = defaultInterval;
for (int i = 1; i < failedAttempts; i++) {
interval = interval * 2;
if (interval > MAX_BACKOFF_INTERVAL_NANOSECONDS) {
interval = MAX_BACKOFF_INTERVAL_NANOSECONDS;
if (interval > maxBackoffInterval) {
interval = maxBackoffInterval;
break;
}
}

// if the current time is less than the time at which next retry should occur, we should backoff
return currentTime < (initialTimestampInNano + interval);
}

public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts,
DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS);
}

public boolean instanceShouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) {
return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts, backoffIntervalNanos, maxBackoffIntervalNanos);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import java.time.Clock;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;

public class BackoffBuilder {
private long backoffIntervalNanos;
private long maxBackoffIntervalNanos;
private long initial;
private TimeUnit unitInitial;
private long max;
private TimeUnit unitMax;
private Clock clock;
private long mandatoryStop;
private TimeUnit unitMandatoryStop;

@VisibleForTesting
BackoffBuilder() {
this.initial = 0;
this.max = 0;
this.mandatoryStop = 0;
this.clock = Clock.systemDefaultZone();
this.backoffIntervalNanos = 0;
this.maxBackoffIntervalNanos = 0;
}

public BackoffBuilder setInitialTime(long initial, TimeUnit unitInitial) {
this.unitInitial = unitInitial;
this.initial = initial;
return this;
}

public BackoffBuilder setMax(long max, TimeUnit unitMax) {
this.unitMax = unitMax;
this.max = max;
return this;
}

public BackoffBuilder setMandatoryStop(long mandatoryStop, TimeUnit unitMandatoryStop) {
this.mandatoryStop = mandatoryStop;
this.unitMandatoryStop = unitMandatoryStop;
return this;
}

public BackoffBuilder useDefaultBackoffIntervals() {
return useUserConfiguredIntervals(Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS,
Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS );
}

public BackoffBuilder useUserConfiguredIntervals(long backoffIntervalNanos,
long maxBackoffIntervalNanos) {
this.backoffIntervalNanos = backoffIntervalNanos;
this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;
return this;
}

public Backoff create() {
return new Backoff(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, clock,
backoffIntervalNanos, maxBackoffIntervalNanos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,13 @@ public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName nam
CompletableFuture<List<String>> topicsFuture = new CompletableFuture<List<String>>();

AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
0 , TimeUnit.MILLISECONDS);
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(0, TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos())
.create();
getTopicsUnderNamespace(serviceNameResolver.resolveHost(), namespace, backoff, opTimeoutMs, topicsFuture, mode);
return topicsFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,18 @@ public ClientBuilder connectionTimeout(int duration, TimeUnit unit) {
return this;
}

@Override
public ClientBuilder startingBackoffInterval(long duration, TimeUnit unit) {
conf.setDefaultBackoffIntervalNanos(unit.toNanos(duration));
return this;
}

@Override
public ClientBuilder maxBackoffInterval(long duration, TimeUnit unit) {
conf.setMaxBackoffIntervalNanos(unit.toNanos(duration));
return this;
}

public ClientConfigurationData getClientConfigurationData() {
return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private Producer<T> deadLetterProducer;

private final long backoffIntervalNanos;
private final long maxBackoffIntervalNanos;

protected volatile boolean paused;

enum SubscriptionMode {
Expand All @@ -152,18 +155,27 @@ enum SubscriptionMode {
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
if (conf.getReceiverQueueSize() == 0) {
return ConsumerImpl.newConsumerImpl(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, subscriptionMode,
startMessageId, schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}

static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
long backoffIntervalNanos, long maxBackoffIntervalNanos) {
if (conf.getReceiverQueueSize() == 0) {
return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors);
subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
} else {
return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors);
subscriptionMode, startMessageId, schema, interceptors, backoffIntervalNanos, maxBackoffIntervalNanos);
}
}

protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
long backoffIntervalNanos, long maxBackoffIntervalNanos) {
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
Expand Down Expand Up @@ -208,8 +220,14 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
}

this.connectionHandler = new ConnectionHandler(this,
new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS),
this);
new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(60, TimeUnit.SECONDS)
.setMax(0, TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(backoffIntervalNanos,
maxBackoffIntervalNanos)
.create(),
this);

this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
Expand Down Expand Up @@ -238,6 +256,9 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
possibleSendToDeadLetterTopicMessages = null;
}

this.backoffIntervalNanos = backoffIntervalNanos;
this.maxBackoffIntervalNanos = maxBackoffIntervalNanos;

topicNameWithoutPartition = topicName.getPartitionedTopicName();

grabCnx();
Expand Down Expand Up @@ -1448,9 +1469,14 @@ CompletableFuture<MessageId> getLastMessageIdAsync() {
}

AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
0 , TimeUnit.MILLISECONDS);
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(0, TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(backoffIntervalNanos,
maxBackoffIntervalNanos)
.create();

CompletableFuture<MessageId> getLastMessageIdFuture = new CompletableFuture<>();

internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,9 @@ private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, S
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, subFuture,
SubscriptionMode.Durable, null, schema, interceptors);
SubscriptionMode.Durable, null, schema, interceptors,
client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos());
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
Expand All @@ -761,7 +763,8 @@ private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, S
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider().getExecutor(), 0, subFuture, SubscriptionMode.Durable, null,
schema, interceptors);
schema, interceptors, client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos());
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);

futureList = Collections.singletonList(subFuture);
Expand Down Expand Up @@ -977,7 +980,9 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
partitionIndex, subFuture, SubscriptionMode.Durable, null, schema, interceptors);
partitionIndex, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos());
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
if (log.isDebugEnabled()) {
log.debug("[{}] create consumer {} for partitionName: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,15 @@ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration
}

this.connectionHandler = new ConnectionHandler(this,
new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS),
new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(60, TimeUnit.SECONDS)
.setMax(Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS)
.useUserConfiguredIntervals(client.getConfiguration().getDefaultBackoffIntervalNanos(),
client.getConfiguration().getMaxBackoffIntervalNanos())
.create(),
this);

grabCnx();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,8 @@ private <T> CompletableFuture<Consumer<T>> doSingleTopicSubscribeAsync(ConsumerC
listenerThread, consumerSubscribedFuture, metadata.partitions, schema, interceptors);
} else {
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors);
consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
this.conf.getDefaultBackoffIntervalNanos(), this.conf.getMaxBackoffIntervalNanos());
}

synchronized (consumers) {
Expand Down
Loading

0 comments on commit bdfc098

Please sign in to comment.