Skip to content

Commit

Permalink
fix fix-13004 Race condition in ResourceLockImpl#revalidate (apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 authored Nov 29, 2021
1 parent ba339fb commit bb2c934
Showing 1 changed file with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class ResourceLockImpl<T> implements ResourceLock<T> {
private long version;
private final CompletableFuture<Void> expiredFuture;
private boolean revalidateAfterReconnection = false;
private CompletableFuture<Void> revalidateFuture;

private enum State {
Init,
Expand Down Expand Up @@ -144,6 +145,9 @@ synchronized CompletableFuture<Void> acquire(T newValue) {

// Simple operation of acquiring the lock with no retries, or checking for the lock content
private CompletableFuture<Void> acquireWithNoRevalidation(T newValue) {
if (log.isDebugEnabled()) {
log.debug("acquireWithNoRevalidation,newValue={},version={}", newValue, version);
}
byte[] payload;
try {
payload = serde.serialize(path, newValue);
Expand Down Expand Up @@ -212,6 +216,30 @@ synchronized CompletableFuture<Void> revalidateIfNeededAfterReconnection() {
}

synchronized CompletableFuture<Void> revalidate(T newValue) {
if (revalidateFuture == null || revalidateFuture.isDone()) {
revalidateFuture = doRevalidate(newValue);
} else {
if (log.isDebugEnabled()) {
log.debug("Previous revalidating is not finished while revalidate newValue={}, value={}, version={}",
newValue, value, version);
}
CompletableFuture<Void> newFuture = new CompletableFuture<>();
revalidateFuture.whenComplete((unused, throwable) -> {
doRevalidate(newValue).thenRun(() -> newFuture.complete(null))
.exceptionally(throwable1 -> {
newFuture.completeExceptionally(throwable1);
return null;
});
});
revalidateFuture = newFuture;
}
return revalidateFuture;
}

private synchronized CompletableFuture<Void> doRevalidate(T newValue) {
if (log.isDebugEnabled()) {
log.debug("doRevalidate with newValue={}, version={}", newValue, version);
}
return store.get(path)
.thenCompose(optGetResult -> {
if (!optGetResult.isPresent()) {
Expand Down

0 comments on commit bb2c934

Please sign in to comment.