Skip to content

Commit

Permalink
[FLINK-22893][zookeeper] Replace NodeCache with TreeCache
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jul 26, 2021
1 parent 33c42ef commit 6d0e2cf
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
Expand All @@ -53,10 +52,7 @@
* ZooKeeper.
*/
public class ZooKeeperLeaderElectionDriver
implements LeaderElectionDriver,
LeaderLatchListener,
NodeCacheListener,
UnhandledErrorListener {
implements LeaderElectionDriver, LeaderLatchListener, UnhandledErrorListener {

private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderElectionDriver.class);

Expand All @@ -67,7 +63,7 @@ public class ZooKeeperLeaderElectionDriver
private final LeaderLatch leaderLatch;

/** Curator recipe to watch a given ZooKeeper node for changes. */
private final NodeCache cache;
private final TreeCache cache;

/** ZooKeeper path of the node which stores the current leader information. */
private final String connectionInformationPath;
Expand Down Expand Up @@ -107,7 +103,11 @@ public ZooKeeperLeaderElectionDriver(
this.leaderContenderDescription = checkNotNull(leaderContenderDescription);

leaderLatch = new LeaderLatch(client, ZooKeeperUtils.generateLeaderLatchPath(path));
cache = new NodeCache(client, connectionInformationPath);
this.cache =
ZooKeeperUtils.createTreeCache(
client,
connectionInformationPath,
this::retrieveLeaderInformationFromZooKeeper);

client.getUnhandledErrorListenable().addListener(this);

Expand All @@ -116,7 +116,6 @@ public ZooKeeperLeaderElectionDriver(
leaderLatch.addListener(this);
leaderLatch.start();

cache.getListenable().addListener(this);
cache.start();

client.getConnectionStateListenable().addListener(listener);
Expand Down Expand Up @@ -171,10 +170,9 @@ public void notLeader() {
leaderElectionEventHandler.onRevokeLeadership();
}

@Override
public void nodeChanged() throws Exception {
private void retrieveLeaderInformationFromZooKeeper() throws Exception {
if (leaderLatch.hasLeadership()) {
ChildData childData = cache.getCurrentData();
ChildData childData = cache.getCurrentData(connectionInformationPath);
if (childData != null) {
final byte[] data = childData.getData();
if (data != null && data.length > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,14 @@
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionState;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateListener;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.UUID;

Expand All @@ -50,14 +48,14 @@
* ID is retrieved from ZooKeeper.
*/
public class ZooKeeperLeaderRetrievalDriver
implements LeaderRetrievalDriver, NodeCacheListener, UnhandledErrorListener {
implements LeaderRetrievalDriver, UnhandledErrorListener {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperLeaderRetrievalDriver.class);

/** Connection to the used ZooKeeper quorum. */
private final CuratorFramework client;

/** Curator recipe to watch changes of a specific ZooKeeper node. */
private final NodeCache cache;
private final TreeCache cache;

private final String connectionInformationPath;

Expand Down Expand Up @@ -86,12 +84,16 @@ public ZooKeeperLeaderRetrievalDriver(
throws Exception {
this.client = checkNotNull(client, "CuratorFramework client");
this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
this.cache = new NodeCache(client, connectionInformationPath);
this.cache =
ZooKeeperUtils.createTreeCache(
client,
connectionInformationPath,
this::retrieveLeaderInformationFromZooKeeper);

this.leaderRetrievalEventHandler = checkNotNull(leaderRetrievalEventHandler);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);

client.getUnhandledErrorListenable().addListener(this);
cache.getListenable().addListener(this);
cache.start();

client.getConnectionStateListenable().addListener(connectionStateListener);
Expand All @@ -112,23 +114,14 @@ public void close() throws Exception {
client.getUnhandledErrorListenable().removeListener(this);
client.getConnectionStateListenable().removeListener(connectionStateListener);

try {
cache.close();
} catch (IOException e) {
throw new Exception("Could not properly stop the ZooKeeperLeaderRetrievalDriver.", e);
}
}

@Override
public void nodeChanged() {
retrieveLeaderInformationFromZooKeeper();
cache.close();
}

private void retrieveLeaderInformationFromZooKeeper() {
try {
LOG.debug("Leader node has changed.");

final ChildData childData = cache.getCurrentData();
final ChildData childData = cache.getCurrentData(connectionInformationPath);

if (childData != null) {
final byte[] data = childData.getData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.util;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
Expand Down Expand Up @@ -47,12 +48,17 @@
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.filesystem.FileSystemStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.RunnableWithException;

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.api.ACLProvider;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.DefaultACLProvider;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCacheSelector;
import org.apache.flink.shaded.curator4.org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooDefs;
import org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.data.ACL;
Expand Down Expand Up @@ -534,6 +540,65 @@ public static CuratorFramework useNamespaceAndEnsurePath(
trimStartingSlash(newNamespace));
}

/**
* Creates a {@link TreeCache} that only observes a specific node.
*
* @param client ZK client
* @param pathToNode full path of the node to observe
* @param nodeChangeCallback callback to run if the node has changed
* @return tree cache
*/
public static TreeCache createTreeCache(
final CuratorFramework client,
final String pathToNode,
final RunnableWithException nodeChangeCallback) {
final TreeCache cache =
TreeCache.newBuilder(client, pathToNode)
.setCacheData(true)
.setCreateParentNodes(false)
.setSelector(ZooKeeperUtils.treeCacheSelectorForPath(pathToNode))
.setExecutor(Executors.newDirectExecutorService())
.build();

cache.getListenable().addListener(createTreeCacheListener(nodeChangeCallback));

return cache;
}

@VisibleForTesting
static TreeCacheListener createTreeCacheListener(RunnableWithException nodeChangeCallback) {
return (ignored, event) -> {
// only notify listener if nodes have changed
// connection issues are handled separately from the cache
switch (event.getType()) {
case NODE_ADDED:
case NODE_UPDATED:
case NODE_REMOVED:
nodeChangeCallback.run();
}
};
}

/**
* Returns a {@link TreeCacheSelector} that only accepts a specific node.
*
* @param fullPath node to accept
* @return tree cache selector
*/
private static TreeCacheSelector treeCacheSelectorForPath(String fullPath) {
return new TreeCacheSelector() {
@Override
public boolean traverseChildren(String childPath) {
return false;
}

@Override
public boolean acceptChild(String childPath) {
return fullPath.equals(childPath);
}
};
}

/** Secure {@link ACLProvider} implementation. */
public static class SecureAclProvider implements ACLProvider {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.util;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.flink.shaded.guava18.com.google.common.io.Closer;

import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;

/**
* Tests for {@link ZooKeeperUtils#createTreeCache(CuratorFramework, String,
* org.apache.flink.util.function.RunnableWithException)}.
*/
public class ZooKeeperUtilsTreeCacheTest extends TestLogger {

private static final String PARENT_PATH = "/foo";
private static final String CHILD_PATH = PARENT_PATH + "/bar";

private Closer closer;
private CuratorFramework client;

private final AtomicReference<CompletableFuture<Void>> callbackFutureReference =
new AtomicReference<>();

@Before
public void setUp() throws Exception {
closer = Closer.create();
final TestingServer testingServer = closer.register(new TestingServer());

Configuration configuration = new Configuration();
configuration.set(
HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());

client = closer.register(ZooKeeperUtils.startCuratorFramework(configuration));
client = closer.register(ZooKeeperUtils.startCuratorFramework(configuration));
final TreeCache cache =
closer.register(
ZooKeeperUtils.createTreeCache(
client,
CHILD_PATH,
() -> callbackFutureReference.get().complete(null)));
cache.start();
}

@After
public void tearDown() throws Exception {
closer.close();
callbackFutureReference.set(null);
}

@Test
public void testCallbackCalledOnNodeCreation() throws Exception {
client.create().forPath(PARENT_PATH);
callbackFutureReference.set(new CompletableFuture<>());
client.create().forPath(CHILD_PATH);
callbackFutureReference.get().get();
}

@Test
public void testCallbackCalledOnNodeModification() throws Exception {
testCallbackCalledOnNodeCreation();

callbackFutureReference.set(new CompletableFuture<>());
client.setData().forPath(CHILD_PATH, new byte[1]);
callbackFutureReference.get().get();
}

@Test
public void testCallbackCalledOnNodeDeletion() throws Exception {
testCallbackCalledOnNodeCreation();

callbackFutureReference.set(new CompletableFuture<>());
client.delete().forPath(CHILD_PATH);
callbackFutureReference.get().get();
}

@Test
public void testCallbackNotCalledOnCreationOfParents() throws Exception {
callbackFutureReference.set(new CompletableFuture<>());
client.create().forPath(PARENT_PATH);
assertThat(
callbackFutureReference.get(),
FlinkMatchers.willNotComplete(Duration.ofMillis(20)));
}

@Test
public void testCallbackNotCalledOnCreationOfChildren() throws Exception {
testCallbackCalledOnNodeCreation();

callbackFutureReference.set(new CompletableFuture<>());
client.create().forPath(CHILD_PATH + "/baz");
assertThat(
callbackFutureReference.get(),
FlinkMatchers.willNotComplete(Duration.ofMillis(20)));
}

@Test
public void testCallbackNotCalledOnCreationOfSimilarPaths() throws Exception {
callbackFutureReference.set(new CompletableFuture<>());
client.create()
.creatingParentContainersIfNeeded()
.forPath(CHILD_PATH.substring(0, CHILD_PATH.length() - 1));
assertThat(
callbackFutureReference.get(),
FlinkMatchers.willNotComplete(Duration.ofMillis(20)));
}

@Test
public void testCallbackNotCalledOnConnectionOrInitializationEvents() throws Exception {
final TreeCacheListener treeCacheListener =
ZooKeeperUtils.createTreeCacheListener(
() -> {
throw new AssertionError("Should not be called.");
});

treeCacheListener.childEvent(
client, new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED, null));
treeCacheListener.childEvent(
client, new TreeCacheEvent(TreeCacheEvent.Type.CONNECTION_RECONNECTED, null));
treeCacheListener.childEvent(
client, new TreeCacheEvent(TreeCacheEvent.Type.CONNECTION_LOST, null));
treeCacheListener.childEvent(
client, new TreeCacheEvent(TreeCacheEvent.Type.CONNECTION_SUSPENDED, null));
}
}

0 comments on commit 6d0e2cf

Please sign in to comment.