Skip to content

Commit

Permalink
Avoid using zk-client thread to process zk-data result (apache#361)
Browse files Browse the repository at this point in the history
* Avoid using zk-client thread to process zk-data result

* add ThreadPool-name and stop executors on shutdown

* shutdown zkCache executor if zkCache has initialized it
  • Loading branch information
rdhabalia authored and merlimat committed Apr 21, 2017
1 parent 136fb7b commit 01b5104
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.zookeeper.AsyncCallback.Children2Callback;
Expand Down Expand Up @@ -61,18 +60,24 @@ public class MockZooKeeper extends ZooKeeper {
private KeeperException.Code failReturnCode;
private Watcher sessionWatcher;
private long sessionId = 0L;
private int readOpDelayMs;

public static MockZooKeeper newInstance() {
return newInstance(null);
}

public static MockZooKeeper newInstance(ExecutorService executor) {
return newInstance(executor, -1);
}

public static MockZooKeeper newInstance(ExecutorService executor, int readOpDelayMs) {
try {
ReflectionFactory rf = ReflectionFactory.getReflectionFactory();
Constructor objDef = Object.class.getDeclaredConstructor(new Class[0]);
Constructor intConstr = rf.newConstructorForSerialization(MockZooKeeper.class, objDef);
MockZooKeeper zk = MockZooKeeper.class.cast(intConstr.newInstance());
zk.init(executor);
zk.readOpDelayMs = readOpDelayMs;
return zk;
} catch (RuntimeException e) {
throw e;
Expand Down Expand Up @@ -192,7 +197,6 @@ public synchronized void create(final String path, final byte[] data, final List
@Override
public synchronized byte[] getData(String path, Watcher watcher, Stat stat) throws KeeperException {
checkProgrammedFail();

Pair<String, Integer> value = tree.get(path);
if (value == null) {
throw new KeeperException.NoNodeException(path);
Expand All @@ -210,6 +214,7 @@ public synchronized byte[] getData(String path, Watcher watcher, Stat stat) thro
@Override
public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
if (getProgrammedFailStatus()) {
cb.processResult(failReturnCode.intValue(), path, ctx, null, null);
return;
Expand All @@ -236,6 +241,7 @@ public void getData(final String path, boolean watch, final DataCallback cb, fin
@Override
public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
synchronized (MockZooKeeper.this) {
if (getProgrammedFailStatus()) {
cb.processResult(failReturnCode.intValue(), path, ctx, null, null);
Expand Down Expand Up @@ -719,5 +725,15 @@ public String toString() {
return "MockZookeeper";
}

private void checkReadOpDelay() {
if (readOpDelayMs > 0) {
try {
Thread.sleep(readOpDelayMs);
} catch (InterruptedException e) {
// Ok
}
}
}

private static final Logger log = LoggerFactory.getLogger(MockZooKeeper.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@
*/
public interface BookKeeperClientFactory {
BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException;
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {

private ZooKeeperCache rackawarePolicyZkCache;
private ZooKeeperCache clientIsolationZkCache;

@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
ClientConfiguration bkConf = new ClientConfiguration();
Expand Down Expand Up @@ -61,17 +64,19 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
bkConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
ZkBookieRackAffinityMapping.class.getName());
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(zkClient) {
});
this.rackawarePolicyZkCache = new ZooKeeperCache(zkClient) {
};
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache);
}

if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) {
bkConf.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class);
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(zkClient) {
});
this.clientIsolationZkCache = new ZooKeeperCache(zkClient) {
};
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.clientIsolationZkCache);
}
}

Expand All @@ -81,4 +86,13 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
throw new IOException(e);
}
}

public void close() {
if (this.rackawarePolicyZkCache != null) {
this.rackawarePolicyZkCache.stop();
}
if (this.clientIsolationZkCache != null) {
this.clientIsolationZkCache.stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public class PulsarService implements AutoCloseable {
private WebSocketService webSocketService = null;
private ConfigurationCacheService configurationCacheService = null;
private LocalZooKeeperCacheService localZkCacheService = null;
private BookKeeperClientFactory bkClientFactory;
private ZooKeeperCache localZkCache;
private GlobalZooKeeperCache globalZkCache;
private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
Expand Down Expand Up @@ -163,6 +164,11 @@ public void close() throws PulsarServerException {
this.managedLedgerClientFactory = null;
}

if (bkClientFactory != null) {
this.bkClientFactory.close();
this.bkClientFactory = null;
}

if (this.leaderElectionService != null) {
this.leaderElectionService.stop();
this.leaderElectionService = null;
Expand Down Expand Up @@ -235,8 +241,8 @@ public void start() throws PulsarServerException {
// Initialize and start service to access configuration repository.
this.startZkCacheService();

managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(),
getBookKeeperClientFactory());
this.bkClientFactory = getBookKeeperClientFactory();
managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);

this.brokerService = new BrokerService(this);

Expand Down Expand Up @@ -376,7 +382,7 @@ private void startZkCacheService() throws PulsarServerException {

LOG.info("starting configuration cache service");

this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor());
this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor(), this.executor);
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
getOrderedExecutor(), this.executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,5 +216,10 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
// Always return the same instance (so that we don't loose the mock BK content on broker restart
return mockBookKeeper;
}

@Override
public void close() {
// no-op
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class ResourceQuotaCacheTest {
public void setup() throws Exception {
pulsar = mock(PulsarService.class);
OrderedSafeExecutor executor = new OrderedSafeExecutor(1, "test");
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null);
localCache = new LocalZooKeeperCacheService(zkCache, null);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void setup() throws Exception {
pulsar = mock(PulsarService.class);
config = mock(ServiceConfiguration.class);
executor = new OrderedSafeExecutor(1, "test");
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, null);
localCache = new LocalZooKeeperCacheService(zkCache, null);
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zooke
log.error("Shutting down ZK sessions: {}", exitCode);
});

this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor);
this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor,
null/* cache uses ForkJoinPool if provided scheduler is null to load data-async */);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class GlobalZooKeeperCache extends ZooKeeperCache implements Closeable {

public GlobalZooKeeperCache(ZooKeeperClientFactory zkClientFactory, int zkSessionTimeoutMillis,
String globalZkConnect, OrderedSafeExecutor orderedExecutor, ScheduledExecutorService scheduledExecutor) {
super(null, orderedExecutor);
super(null, orderedExecutor, scheduledExecutor);
this.zlClientFactory = zkClientFactory;
this.zkSessionTimeoutMillis = zkSessionTimeoutMillis;
this.globalZkConnect = globalZkConnect;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.yahoo.pulsar.zookeeper;

import java.util.concurrent.ScheduledExecutorService;

import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
Expand All @@ -32,8 +34,9 @@ public class LocalZooKeeperCache extends ZooKeeperCache {

private static final Logger LOG = LoggerFactory.getLogger(LocalZooKeeperCache.class);

public LocalZooKeeperCache(final ZooKeeper zk, final OrderedSafeExecutor executor) {
super(zk, executor);
public LocalZooKeeperCache(final ZooKeeper zk, final OrderedSafeExecutor executor,
ScheduledExecutorService scheduledExecutor) {
super(zk, executor, scheduledExecutor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

import static com.google.common.base.Preconditions.checkNotNull;

import java.util.AbstractMap;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -46,6 +49,8 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Sets;

import io.netty.util.concurrent.DefaultThreadFactory;

/**
* Per ZK client ZooKeeper cache supporting ZNode data and children list caches. A cache entry is identified, accessed
* and invalidated by the ZNode path. For the data cache, ZNode data parsing is done at request time with the given
Expand Down Expand Up @@ -73,13 +78,16 @@ public static interface CacheUpdater<T> {
protected final AsyncLoadingCache<String, Entry<Object, Stat>> dataCache;
protected final Cache<String, Set<String>> childrenCache;
protected final Cache<String, Boolean> existsCache;
protected final OrderedSafeExecutor executor;
private final OrderedSafeExecutor executor;
private final ScheduledExecutorService scheduledExecutor;
private boolean shouldShutdownExecutor = false;
public static final int cacheTimeOutInSec = 30;

protected AtomicReference<ZooKeeper> zkSession = new AtomicReference<ZooKeeper>(null);

public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor) {
public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor, ScheduledExecutorService scheduledExecutor) {
this.executor = executor;
this.scheduledExecutor = scheduledExecutor;
this.zkSession.set(zkSession);

this.dataCache = Caffeine.newBuilder().expireAfterAccess(1, TimeUnit.HOURS)
Expand All @@ -90,7 +98,9 @@ public ZooKeeperCache(ZooKeeper zkSession, OrderedSafeExecutor executor) {
}

public ZooKeeperCache(ZooKeeper zkSession) {
this(zkSession, new OrderedSafeExecutor(1, "zk-cache-executor"));
this(zkSession, new OrderedSafeExecutor(1, "zk-cache-executor"),
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("zk-cache-callback-executor")));
this.shouldShutdownExecutor = true;
}

public ZooKeeper getZooKeeper() {
Expand Down Expand Up @@ -266,18 +276,20 @@ public <T> CompletableFuture<Optional<Entry<T, Stat>>> getDataAsync(final String
CompletableFuture<Entry<Object, Stat>> zkFuture = new CompletableFuture<>();

this.zkSession.get().getData(path, watcher, (rc, path1, ctx, content, stat) -> {
Executor exec = scheduledExecutor != null ? scheduledExecutor : executor;
if (rc == Code.OK.intValue()) {
try {
T obj = deserializer.deserialize(path, content);
zkFuture.complete(new AbstractMap.SimpleImmutableEntry<Object, Stat>(obj, stat));
// avoid using the zk-client thread to process the result
exec.execute(() -> zkFuture.complete(new SimpleImmutableEntry<Object, Stat>(obj, stat)));
} catch (Exception e) {
zkFuture.completeExceptionally(e);
exec.execute(() -> zkFuture.completeExceptionally(e));
}
} else if (rc == Code.NONODE.intValue()) {
// Return null values for missing z-nodes, as this is not "exceptional" condition
zkFuture.complete(null);
exec.execute(() -> zkFuture.complete(null));
} else {
zkFuture.completeExceptionally(KeeperException.create(rc));
exec.execute(() -> zkFuture.completeExceptionally(KeeperException.create(rc)));
}
}, null);

Expand Down Expand Up @@ -364,4 +376,11 @@ public void invalidateRoot(String root) {
}
}
}

public void stop() {
if (shouldShutdownExecutor) {
this.executor.shutdown();
this.scheduledExecutor.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void testBasic() throws Exception {
// Case1: ZKCache is given
ZkBookieRackAffinityMapping mapping1 = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf1 = new ClientConfiguration();
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
mapping1.setConf(bkClientConf1);
List<String> racks1 = mapping1.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
Expand All @@ -104,7 +104,7 @@ public void testBasic() throws Exception {
public void testNoBookieInfo() throws Exception {
ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
Expand Down Expand Up @@ -158,7 +158,7 @@ public void testBookieInfoChange() throws Exception {

ZkBookieRackAffinityMapping mapping = new ZkBookieRackAffinityMapping();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList(BOOKIE1, BOOKIE2, BOOKIE3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testBasic() throws Exception {

ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf);
Expand Down Expand Up @@ -176,7 +176,7 @@ public void testBasic() throws Exception {
public void testNoBookieInfo() throws Exception {
ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf);
Expand Down Expand Up @@ -296,7 +296,7 @@ public void testNoIsolationGroup() throws Exception {

ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null) {
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, null, null) {
});
isolationPolicy.initialize(bkClientConf);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);
Expand Down
Loading

0 comments on commit 01b5104

Please sign in to comment.