Skip to content

Commit

Permalink
[pulsar-broker] Handle bad-version in metadata-store if broker update…
Browse files Browse the repository at this point in the history
…s zk-metadata from zk-client (apache#9412)

* [pulsar-broker] Handle bad-version in metadata-store if broker updates zk-metadata from zk-client

* address comments
  • Loading branch information
rdhabalia authored Feb 2, 2021
1 parent 8cfaf48 commit f711969
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import lombok.Getter;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand All @@ -44,6 +46,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.Stat;
import org.checkerframework.checker.nullness.Opt;

public class MetadataCacheImpl<T> implements MetadataCache<T>, Consumer<Notification> {

Expand Down Expand Up @@ -118,7 +121,7 @@ public Optional<T> getIfCached(String path) {

@Override
public CompletableFuture<Void> readModifyUpdateOrCreate(String path, Function<Optional<T>, T> modifyFunction) {
return objCache.get(path)
return executeWithRetry(() -> objCache.get(path)
.thenCompose(optEntry -> {
Optional<T> currentValue;
long expectedVersion;
Expand All @@ -145,12 +148,12 @@ public CompletableFuture<Void> readModifyUpdateOrCreate(String path, Function<Op
objCache.put(path,
FutureUtils.value(Optional.of(new SimpleImmutableEntry<T, Stat>(newValueObj, stat))));
});
});
}), path);
}

@Override
public CompletableFuture<Void> readModifyUpdate(String path, Function<T, T> modifyFunction) {
return objCache.get(path)
return executeWithRetry(() -> objCache.get(path)
.thenCompose(optEntry -> {
if (!optEntry.isPresent()) {
return FutureUtils.exception(new NotFoundException(""));
Expand All @@ -174,7 +177,7 @@ public CompletableFuture<Void> readModifyUpdate(String path, Function<T, T> modi
objCache.put(path,
FutureUtils.value(Optional.of(new SimpleImmutableEntry<T, Stat>(newValueObj, stat))));
});
});
}), path);
}

@Override
Expand Down Expand Up @@ -253,4 +256,23 @@ public void accept(Notification t) {
break;
}
}

private CompletableFuture<Void> executeWithRetry(Supplier<CompletableFuture<Void>> op, String key) {
CompletableFuture<Void> result = new CompletableFuture<>();
op.get().thenAccept(r -> result.complete(null)).exceptionally((ex) -> {
if (ex.getCause() instanceof BadVersionException) {
// if resource is updated by other than metadata-cache then metadata-cache will get bad-version
// exception. so, try to invalidate the cache and try one more time.
objCache.synchronous().invalidate(key);
op.get().thenAccept((c) -> result.complete(null)).exceptionally((ex1) -> {
result.completeExceptionally(ex1.getCause());
return null;
});
return null;
}
result.completeExceptionally(ex.getCause());
return null;
});
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@
import static org.testng.Assert.fail;

import com.fasterxml.jackson.core.type.TypeReference;

import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletionException;

import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.NoArgsConstructor;

import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
Expand Down Expand Up @@ -87,9 +84,8 @@ public void insertionDeletionWitGenericType(String provider, String url) throws
@Cleanup
MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());

MetadataCache<Map<String, String>> objCache = store
.getMetadataCache(new TypeReference<Map<String, String>>() {
});
MetadataCache<Map<String, String>> objCache = store.getMetadataCache(new TypeReference<Map<String, String>>() {
});

String key1 = newKey();

Expand Down Expand Up @@ -159,9 +155,8 @@ public void insertionOutsideCache(String provider, String url) throws Exception
public void insertionOutsideCacheWithGenericType(String provider, String url) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
MetadataCache<Map<String, String>> objCache = store
.getMetadataCache(new TypeReference<Map<String, String>>() {
});
MetadataCache<Map<String, String>> objCache = store.getMetadataCache(new TypeReference<Map<String, String>>() {
});

String key1 = newKey();

Expand Down Expand Up @@ -258,4 +253,34 @@ public void readModifyUpdateOrCreate(String provider, String url) throws Excepti
assertEquals(newValue1.get().b, 2);
}

/**
* This test validates that metadata-cache can handle BadVersion failure if other cache/metadata-source updates the
* data with different version.
*
* @throws Exception
*/
@Test
public void readModifyUpdateBadVersionRetry() throws Exception {
String url = zks.getConnectionString();
@Cleanup
MetadataStore sourceStore1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());
MetadataStore sourceStore2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());

MetadataCache<MyClass> objCache1 = sourceStore1.getMetadataCache(MyClass.class);
MetadataCache<MyClass> objCache2 = sourceStore2.getMetadataCache(MyClass.class);

String key1 = newKey();

MyClass value1 = new MyClass("a", 1);
objCache1.create(key1, value1).join();
objCache1.get(key1).join();

objCache2.readModifyUpdate(key1, v -> {
return new MyClass(v.a, v.b + 1);
}).join();

objCache1.readModifyUpdate(key1, v -> {
return new MyClass(v.a, v.b + 1);
}).join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@Slf4j
public class TestZKServer implements AutoCloseable {

private final ZooKeeperServer zks;
protected final ZooKeeperServer zks;
private final File zkDataDir;
private final ServerCnxnFactory serverFactory;

Expand Down

0 comments on commit f711969

Please sign in to comment.