Skip to content

Commit

Permalink
Revert "Fix: LockManagerTest.updateValue is flaky (apache#13911)" (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd authored May 7, 2022
1 parent b5f8647 commit 306a0a1
Showing 1 changed file with 10 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,24 +161,21 @@ public CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long>

long now = System.currentTimeMillis();

CompletableFuture<Stat> future = new CompletableFuture<>();
if (hasVersion && expectedVersion == -1) {
Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral));
Value existingValue = map.putIfAbsent(path, newValue);
if (existingValue != null) {
execute(() -> future.completeExceptionally(new BadVersionException("")), future);
return FutureUtils.exception(new BadVersionException(""));
} else {
receivedNotification(new Notification(NotificationType.Created, path));
notifyParentChildrenChanged(path);
String finalPath = path;
execute(() -> future.complete(new Stat(finalPath, 0, now, now, newValue.isEphemeral(),
true)), future);
return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true));
}
} else {
Value existingValue = map.get(path);
long existingVersion = existingValue != null ? existingValue.version : -1;
if (hasVersion && expectedVersion != existingVersion) {
execute(() -> future.completeExceptionally(new BadVersionException("")), future);
return FutureUtils.exception(new BadVersionException(""));
} else {
long newVersion = existingValue != null ? existingValue.version + 1 : 0;
long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now;
Expand All @@ -192,13 +189,12 @@ public CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long>
if (type == NotificationType.Created) {
notifyParentChildrenChanged(path);
}
String finalPath = path;
execute(() -> future.complete(new Stat(finalPath, newValue.version, newValue.createdTimestamp,
newValue.modifiedTimestamp,
false, true)), future);
return FutureUtils
.value(new Stat(path, newValue.version, newValue.createdTimestamp,
newValue.modifiedTimestamp,
false, true));
}
}
return future;
}
}

Expand All @@ -208,20 +204,18 @@ public CompletableFuture<Void> storeDelete(String path, Optional<Long> optExpect
return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
}
synchronized (map) {
CompletableFuture<Void> future = new CompletableFuture<>();
Value value = map.get(path);
if (value == null) {
execute(() -> future.completeExceptionally(new NotFoundException("")), future);
return FutureUtils.exception(new NotFoundException(""));
} else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) {
execute(() -> future.completeExceptionally(new BadVersionException("")), future);
return FutureUtils.exception(new BadVersionException(""));
} else {
map.remove(path);
receivedNotification(new Notification(NotificationType.Deleted, path));

notifyParentChildrenChanged(path);
execute(() -> future.complete(null), future);
return FutureUtils.value(null);
}
return future;
}
}
}

0 comments on commit 306a0a1

Please sign in to comment.