Skip to content

Commit

Permalink
[pulsar-broker] MockZK: Handle zk-children watch notification (apache…
Browse files Browse the repository at this point in the history
…#9473)

Co-authored-by: Matteo Merli <[email protected]>
  • Loading branch information
rdhabalia and merlimat authored Feb 6, 2021
1 parent 683ee5f commit 19dec2c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,25 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
Expand All @@ -41,6 +47,14 @@

public class MetaStoreImplTest extends MockedBookKeeperTestCase {

@Data
@AllArgsConstructor
@NoArgsConstructor
static class MyClass {
String a;
int b;
}

@Test
void getMLList() throws Exception {
MetaStore store = new MetaStoreImpl(new ZKMetadataStore(zkc), executor);
Expand Down Expand Up @@ -226,4 +240,30 @@ public void operationComplete(Void result, Stat version) {

promise.get();
}

@Test
public void testGetChildrenWatch() throws Exception {
MetadataStore store = new ZKMetadataStore(zkc);
MetadataCache<MyClass> objCache1 = store.getMetadataCache(MyClass.class);

String path = "/managed-ledgers/prop-xyz/ns1/persistent";
assertTrue(objCache1.getChildren(path).get().isEmpty());

CountDownLatch latch = new CountDownLatch(1);
ZkUtils.asyncCreateFullPathOptimistic(zkc, "/managed-ledgers/prop-xyz/ns1/persistent/t1", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path2, ctx, name) -> {
latch.countDown();
}, null);
latch.await();

ManagedLedgerTest.retryStrategically((test) -> {
try {
return !objCache1.getChildren(path).get().isEmpty();
} catch (Exception e) {
// Ok
}
return false;
}, 5, 1000);
assertFalse(objCache1.getChildren(path).get().isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ public void create(final String path, final byte[] data, final List<ACL> acl, Cr
cb.processResult(KeeperException.Code.NODEEXISTS.intValue(), path, ctx, null);
} else if (!parent.isEmpty() && !tree.containsKey(parent)) {
mutex.unlock();
toNotifyParent.forEach(watcher -> watcher
.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, parent)));
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
} else {
tree.put(name, Pair.of(data, 0));
Expand Down

0 comments on commit 19dec2c

Please sign in to comment.