Skip to content

Commit

Permalink
[fix][meta] Fixed race condition between ResourceLock update and inva…
Browse files Browse the repository at this point in the history
…lidation (apache#19817)
  • Loading branch information
merlimat authored Mar 16, 2023
1 parent 8cff123 commit 9adec1b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ private void handleSessionEvent(SessionEvent se) {
if (se == SessionEvent.SessionReestablished) {
log.info("Metadata store session has been re-established. Revalidating all the existing locks.");
for (ResourceLockImpl<T> lock : locks.values()) {
futures.add(lock.revalidate(lock.getValue(), true));
futures.add(lock.revalidate(lock.getValue(), true, true));
}

} else if (se == SessionEvent.Reconnected) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
private long version;
private final CompletableFuture<Void> expiredFuture;
private boolean revalidateAfterReconnection = false;
private CompletableFuture<Void> revalidateFuture;
private CompletableFuture<Void> pendingOperationFuture;

private enum State {
Init,
Expand All @@ -61,6 +61,7 @@ public ResourceLockImpl(MetadataStoreExtended store, MetadataSerde<T> serde, Str
this.path = path;
this.version = -1;
this.expiredFuture = new CompletableFuture<>();
this.pendingOperationFuture = CompletableFuture.completedFuture(null);
this.state = State.Init;
}

Expand All @@ -71,7 +72,24 @@ public synchronized T getValue() {

@Override
public synchronized CompletableFuture<Void> updateValue(T newValue) {
return acquire(newValue);
// If there is an operation in progress, we're going to let it complete before attempting to
// update the value
if (pendingOperationFuture.isDone()) {
pendingOperationFuture = CompletableFuture.completedFuture(null);
}

pendingOperationFuture = pendingOperationFuture.thenCompose(v -> {
synchronized (ResourceLockImpl.this) {
if (state != State.Valid) {
return CompletableFuture.failedFuture(
new IllegalStateException("Lock was not in valid state: " + state));
}

return acquire(newValue);
}
});

return pendingOperationFuture;
}

@Override
Expand Down Expand Up @@ -128,7 +146,7 @@ synchronized CompletableFuture<Void> acquire(T newValue) {
.thenRun(() -> result.complete(null))
.exceptionally(ex -> {
if (ex.getCause() instanceof LockBusyException) {
revalidate(newValue, false)
revalidate(newValue, false, false)
.thenAccept(__ -> result.complete(null))
.exceptionally(ex1 -> {
result.completeExceptionally(ex1);
Expand Down Expand Up @@ -185,39 +203,47 @@ synchronized void lockWasInvalidated() {
}

log.info("Lock on resource {} was invalidated", path);
revalidate(value, true)
revalidate(value, true, true)
.thenRun(() -> log.info("Successfully revalidated the lock on {}", path));
}

synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
if (revalidateAfterReconnection) {
revalidateAfterReconnection = false;
log.warn("Revalidate lock at {} after reconnection", path);
return revalidate(value, true);
return revalidate(value, true, true);
} else {
return CompletableFuture.completedFuture(null);
}
}

synchronized CompletableFuture<Void> revalidate(T newValue, boolean revalidateAfterReconnection) {
if (revalidateFuture == null || revalidateFuture.isDone()) {
revalidateFuture = doRevalidate(newValue);
synchronized CompletableFuture<Void> revalidate(T newValue, boolean trackPendingOperation,
boolean revalidateAfterReconnection) {

final CompletableFuture<Void> trackFuture;

if (!trackPendingOperation) {
trackFuture = doRevalidate(newValue);
} else if (pendingOperationFuture.isDone()) {
pendingOperationFuture = doRevalidate(newValue);
trackFuture = pendingOperationFuture;
} else {
if (log.isDebugEnabled()) {
log.debug("Previous revalidating is not finished while revalidate newValue={}, value={}, version={}",
newValue, value, version);
}
CompletableFuture<Void> newFuture = new CompletableFuture<>();
revalidateFuture.whenComplete((unused, throwable) -> {
doRevalidate(newValue).thenRun(() -> newFuture.complete(null))
trackFuture = new CompletableFuture<>();
trackFuture.whenComplete((unused, throwable) -> {
doRevalidate(newValue).thenRun(() -> trackFuture.complete(null))
.exceptionally(throwable1 -> {
newFuture.completeExceptionally(throwable1);
trackFuture.completeExceptionally(throwable1);
return null;
});
});
revalidateFuture = newFuture;
pendingOperationFuture = trackFuture;
}
revalidateFuture.exceptionally(ex -> {

trackFuture.exceptionally(ex -> {
synchronized (ResourceLockImpl.this) {
Throwable realCause = FutureUtil.unwrapCompletionException(ex);
if (!revalidateAfterReconnection || realCause instanceof BadVersionException
Expand All @@ -237,7 +263,7 @@ synchronized CompletableFuture<Void> revalidate(T newValue, boolean revalidateAf
}
return null;
});
return revalidateFuture;
return trackFuture;
}

private synchronized CompletableFuture<Void> doRevalidate(T newValue) {
Expand Down

0 comments on commit 9adec1b

Please sign in to comment.