Skip to content

Commit

Permalink
Ensure cache is refreshed (and not just invalidated) after a store wr…
Browse files Browse the repository at this point in the history
…ite (apache#12788)

### Motivation

When we're doing a write to the store from outside the `MetadataCache`, we are immediately invalidating the cache to ensure read-after-write consistency through the cache. 

The only issue is that the invalidation, will not trigger a reloading of the value. Instead it is relying on the next call to `cache.get()` which will see the cache miss and it will load the new value into the cache.

This means that calls `cache.getIfCached()`, which is not triggering a cache load, will keep seeing the key as missing.

### Modification

Ensure we're calling refresh on the cache to get the value automatically reloaded in background and make sure the `getIfCached()` will eventually return the new value, even if there are no calls to `cache.get()`.
  • Loading branch information
merlimat authored Nov 17, 2021
1 parent 1ade1ba commit 2bc4499
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,11 @@ public interface MetadataCache<T> {
* @param path the path of the object in the metadata store
*/
void invalidate(String path);

/**
* Invalidate and reload an object in the metadata cache.
*
* @param path the path of the object in the metadata store
*/
void refresh(String path);
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ public CompletableFuture<T> readModifyUpdateOrCreate(String path, Function<Optio
}

return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(__ -> {
objCache.synchronous().invalidate(path);
objCache.synchronous().refresh(path);
refresh(path);
}).thenApply(__ -> newValueObj);
}), path);
}
Expand Down Expand Up @@ -198,8 +197,7 @@ public CompletableFuture<T> readModifyUpdate(String path, Function<T, T> modifyF
}

return store.put(path, newValue, Optional.of(expectedVersion)).thenAccept(__ -> {
objCache.synchronous().invalidate(path);
objCache.synchronous().refresh(path);
refresh(path);
}).thenApply(__ -> newValueObj);
}), path);
}
Expand All @@ -220,7 +218,7 @@ public CompletableFuture<Void> create(String path, T value) {
// In addition to caching the value, we need to add a watch on the path,
// so when/if it changes on any other node, we are notified and we can
// update the cache
objCache.get(path).whenComplete( (stat2, ex) -> {
objCache.get(path).whenComplete((stat2, ex) -> {
if (ex == null) {
future.complete(null);
} else {
Expand Down Expand Up @@ -261,6 +259,12 @@ public void invalidate(String path) {
objCache.synchronous().invalidate(path);
}

@Override
public void refresh(String path) {
objCache.synchronous().invalidate(path);
objCache.synchronous().refresh(path);
}

@VisibleForTesting
public void invalidateAll() {
objCache.synchronous().invalidateAll();
Expand All @@ -275,8 +279,7 @@ public void accept(Notification t) {
if (objCache.synchronous().getIfPresent(path) != null) {
// Trigger background refresh of the cached item, but before make sure
// to invalidate the entry so that we won't serve a stale cached version
objCache.synchronous().invalidate(path);
objCache.synchronous().refresh(path);
refresh(path);
}
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long
}
}

metadataCaches.forEach(c -> c.invalidate(path));
metadataCaches.forEach(c -> c.refresh(path));
return stat;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,34 @@ public void insertionDeletion(String provider, Supplier<String> urlSupplier) thr
assertEquals(objCache.get(key1).join(), Optional.empty());
}

@Test(dataProvider = "impl")
public void insertionWithInvalidation(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
MetadataCache<MyClass> objCache = store.getMetadataCache(MyClass.class);

String key1 = newKey();

assertEquals(objCache.getIfCached(key1), Optional.empty());
assertEquals(objCache.get(key1).join(), Optional.empty());

MyClass value1 = new MyClass("a", 1);
store.put(key1, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value1), Optional.of(-1L)).join();

Awaitility.await().untilAsserted(() -> {
assertEquals(objCache.getIfCached(key1), Optional.of(value1));
assertEquals(objCache.get(key1).join(), Optional.of(value1));
});

MyClass value2 = new MyClass("a", 2);
store.put(key1, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(value2), Optional.of(0L)).join();

Awaitility.await().untilAsserted(() -> {
assertEquals(objCache.getIfCached(key1), Optional.of(value2));
assertEquals(objCache.get(key1).join(), Optional.of(value2));
});
}

@Test(dataProvider = "impl")
public void insertionOutsideCache(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
Expand Down Expand Up @@ -309,8 +337,10 @@ public void insertionOutsideCacheWithGenericType(String provider, Supplier<Strin
v.put("b", "2");
store.put(key1, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(v), Optional.of(-1L)).join();

assertEquals(objCache.getIfCached(key1), Optional.empty());
assertEquals(objCache.get(key1).join(), Optional.of(v));
Awaitility.await().untilAsserted(() -> {
assertEquals(objCache.getIfCached(key1), Optional.of(v));
assertEquals(objCache.get(key1).join(), Optional.of(v));
});
}

@Test(dataProvider = "impl")
Expand Down

0 comments on commit 2bc4499

Please sign in to comment.