Skip to content

Commit

Permalink
[improve] Retry re-validating ResourceLock with backoff after errors (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 7, 2024
1 parent 788b5ae commit 83b86ab
Show file tree
Hide file tree
Showing 27 changed files with 105 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.StringInterner;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicPoliciesCacheNotInitException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.jetbrains.annotations.NotNull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.transaction.exception.buffer.TransactionBufferException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
import org.apache.pulsar.compaction.Compactor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand All @@ -65,6 +64,7 @@
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.resources.LocalPoliciesResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;
import org.apache.pulsar.stats.CacheMetricsCollector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.events.PulsarEvent;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import lombok.Cleanup;
import org.apache.pulsar.client.util.RetryUtil;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.annotations.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.BytesSchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.HandlerState.State;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Backoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RelativeTimeUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.topics.TopicList;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.common.api.proto.CommandWatchTopicUpdate;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.util.Backoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.ScheduledExecutorProvider;
import org.apache.pulsar.common.util.Backoff;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
package org.apache.pulsar.common.util;

import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
package org.apache.pulsar.common.util;

import java.time.Clock;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,8 +32,11 @@ public class BackoffBuilder {

public BackoffBuilder() {
this.initial = 0;
this.unitInitial = TimeUnit.MILLISECONDS;
this.max = 0;
this.unitMax = TimeUnit.MILLISECONDS;
this.mandatoryStop = 0;
this.unitMandatoryStop = TimeUnit.MILLISECONDS;
this.clock = Clock.systemDefaultZone();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
package org.apache.pulsar.common.util;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -52,21 +52,21 @@ class LockManagerImpl<T> implements LockManager<T> {
private final MetadataCache<T> cache;
private final MetadataSerde<T> serde;
private final FutureUtil.Sequencer<Void> sequencer;
private final ExecutorService executor;
private final ScheduledExecutorService executor;

private enum State {
Ready, Closed
}

private State state = State.Ready;

LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, ExecutorService executor) {
LockManagerImpl(MetadataStoreExtended store, Class<T> clazz, ScheduledExecutorService executor) {
this(store, new JSONMetadataSerdeSimpleType<>(
TypeFactory.defaultInstance().constructSimpleType(clazz, null)),
executor);
}

LockManagerImpl(MetadataStoreExtended store, MetadataSerde<T> serde, ExecutorService executor) {
LockManagerImpl(MetadataStoreExtended store, MetadataSerde<T> serde, ScheduledExecutorService executor) {
this.store = store;
this.cache = store.getMetadataCache(serde);
this.serde = serde;
Expand All @@ -83,7 +83,7 @@ public CompletableFuture<Optional<T>> readLock(String path) {

@Override
public CompletableFuture<ResourceLock<T>> acquireLock(String path, T value) {
ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path);
ResourceLockImpl<T> lock = new ResourceLockImpl<>(store, serde, path, executor);

CompletableFuture<ResourceLock<T>> result = new CompletableFuture<>();
lock.acquire(value).thenRun(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataSerde;
Expand All @@ -44,7 +49,10 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
private long version;
private final CompletableFuture<Void> expiredFuture;
private boolean revalidateAfterReconnection = false;
private final Backoff backoff;
private final FutureUtil.Sequencer<Void> sequencer;
private final ScheduledExecutorService executor;
private ScheduledFuture<?> revalidateTask;

private enum State {
Init,
Expand All @@ -55,14 +63,20 @@ private enum State {

private State state;

public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path) {
ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, String path,
ScheduledExecutorService executor) {
this.store = store;
this.serde = serde;
this.path = path;
this.version = -1;
this.expiredFuture = new CompletableFuture<>();
this.sequencer = FutureUtil.Sequencer.create();
this.state = State.Init;
this.executor = executor;
this.backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMax(60, TimeUnit.SECONDS)
.create();
}

@Override
Expand Down Expand Up @@ -93,6 +107,10 @@ public synchronized CompletableFuture<Void> release() {
}

state = State.Releasing;
if (revalidateTask != null) {
revalidateTask.cancel(true);
}

CompletableFuture<Void> result = new CompletableFuture<>();

store.delete(path, Optional.of(version))
Expand Down Expand Up @@ -210,8 +228,15 @@ synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
* This method is thread-safe and it will perform multiple re-validation operations in turn.
*/
synchronized CompletableFuture<Void> silentRevalidateOnce() {
if (state != State.Valid) {
return CompletableFuture.completedFuture(null);
}

return sequencer.sequential(() -> revalidate(value))
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path))
.thenRun(() -> {
log.info("Successfully revalidated the lock on {}", path);
backoff.reset();
})
.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
Expand All @@ -225,8 +250,12 @@ synchronized CompletableFuture<Void> silentRevalidateOnce() {
// Continue assuming we hold the lock, until we can revalidate it, either
// on Reconnected or SessionReestablished events.
revalidateAfterReconnection = true;
log.warn("Failed to revalidate the lock at {}. Retrying later on reconnection {}", path,
realCause.getMessage());

long delayMillis = backoff.next();
log.warn("Failed to revalidate the lock at {}: {} - Retrying in {} seconds", path,
realCause.getMessage(), delayMillis / 1000.0);
revalidateTask =
executor.schedule(this::silentRevalidateOnce, delayMillis, TimeUnit.MILLISECONDS);
}
}
return null;
Expand Down
Loading

0 comments on commit 83b86ab

Please sign in to comment.