Skip to content

Commit

Permalink
Unit test to verify metadata cache consistency across brokers. (apach…
Browse files Browse the repository at this point in the history
…e#11202)

Co-authored-by: Bharani Chadalavada <[email protected]>
  • Loading branch information
bharanic-dev and Bharani Chadalavada authored Jul 26, 2021
1 parent b6d02e4 commit 38bba54
Showing 1 changed file with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;

import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.CacheGetResult;
Expand All @@ -51,11 +54,14 @@
import org.apache.pulsar.metadata.api.MetadataStoreException.ContentDeserializationException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
public class MetadataCacheTest extends BaseMetadataStoreTest {

@Data
Expand Down Expand Up @@ -102,7 +108,7 @@ public Object[][] zkimplementations() {
}

@Test(dataProvider = "zk")
public void crossStoreUpdates(String provider, String url) throws Exception {
public void crossStoreAddDelete(String provider, String url) throws Exception {
@Cleanup
MetadataStore store1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());

Expand Down Expand Up @@ -165,6 +171,51 @@ private void multiStoreAddDelete(List<MetadataCache<MyClass>> caches, int addOn,
});
}

@Test(dataProvider = "zk")
public void crossStoreUpdates(String provider, String url) throws Exception {
String testName = "cross store updates";
@Cleanup
MetadataStore store1 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());

@Cleanup
MetadataStore store2 = MetadataStoreFactory.create(url, MetadataStoreConfig.builder().build());

MetadataCacheImpl<MyClass> objCache1 = (MetadataCacheImpl<MyClass>) store1.getMetadataCache(MyClass.class);

MetadataCacheImpl<MyClass> objCache2 = (MetadataCacheImpl<MyClass>) store2.getMetadataCache(MyClass.class);
AtomicReference<MyClass> storeObj = new AtomicReference<MyClass>();
store2.registerListener(n -> {
if (n.getType() == NotificationType.Modified) {
CompletableFuture.runAsync(() -> {
try {
MyClass obj = objCache2.get(n.getPath()).get().get();
storeObj.set(obj);
} catch (Exception e) {
log.error("Got exception {}", e.getMessage());
}
});
};
});

String key1 = "/test-key1";
assertEquals(objCache1.getIfCached(key1), Optional.empty());
assertEquals(objCache2.getIfCached(key1), Optional.empty());

MyClass value1 = new MyClass(testName, 1);
objCache1.create(key1, value1).join();

Awaitility.await().ignoreNoExceptions().untilAsserted(() -> {
assertEquals(objCache1.getIfCached(key1), Optional.of(value1));
assertEquals(objCache2.get(key1).join(), Optional.of(value1));
assertEquals(objCache2.getIfCached(key1), Optional.of(value1));
});

MyClass value2 = new MyClass(testName, 2);
objCache1.readModifyUpdate(key1, (oldData) -> {return value2;}).join();

Awaitility.await().ignoreNoExceptions().untilAsserted(() ->assertEquals(storeObj.get(), value2));
}

@Test(dataProvider = "impl")
public void insertionDeletionWitGenericType(String provider, String url) throws Exception {
@Cleanup
Expand Down

0 comments on commit 38bba54

Please sign in to comment.