Skip to content

Commit

Permalink
Ensure BookKeeperClientFactory is only instantiated once in PulsarSer…
Browse files Browse the repository at this point in the history
…vice (apache#1804)
  • Loading branch information
merlimat authored May 18, 2018
1 parent e6bcae5 commit 655b7c0
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand All @@ -33,8 +34,8 @@

public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {

private ZooKeeperCache rackawarePolicyZkCache;
private ZooKeeperCache clientIsolationZkCache;
private final AtomicReference<ZooKeeperCache> rackawarePolicyZkCache = new AtomicReference<>();
private final AtomicReference<ZooKeeperCache> clientIsolationZkCache = new AtomicReference<>();

@Override
public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws IOException {
Expand Down Expand Up @@ -67,18 +68,27 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
bkConf.setEnsemblePlacementPolicy(RackawareEnsemblePlacementPolicy.class);
bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
ZkBookieRackAffinityMapping.class.getName());
this.rackawarePolicyZkCache = new ZooKeeperCache(zkClient) {

ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
};
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache);
if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
zkc.stop();
}

bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.rackawarePolicyZkCache.get());
}

if (conf.getBookkeeperClientIsolationGroups() != null && !conf.getBookkeeperClientIsolationGroups().isEmpty()) {
bkConf.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class);
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
this.clientIsolationZkCache = new ZooKeeperCache(zkClient) {
ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
};

if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
zkc.stop();
}
bkConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, this.clientIsolationZkCache);
}
}
Expand All @@ -91,11 +101,11 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
}

public void close() {
if (this.rackawarePolicyZkCache != null) {
this.rackawarePolicyZkCache.stop();
if (this.rackawarePolicyZkCache.get() != null) {
this.rackawarePolicyZkCache.get().stop();
}
if (this.clientIsolationZkCache != null) {
this.clientIsolationZkCache.stop();
if (this.clientIsolationZkCache.get() != null) {
this.clientIsolationZkCache.get().stop();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public void start() throws PulsarServerException {
// Initialize and start service to access configuration repository.
this.startZkCacheService();

this.bkClientFactory = getBookKeeperClientFactory();
this.bkClientFactory = newBookKeeperClientFactory();
managedLedgerClientFactory = new ManagedLedgerClientFactory(config, getZkClient(), bkClientFactory);

this.brokerService = new BrokerService(this);
Expand Down Expand Up @@ -695,10 +695,14 @@ public ZooKeeperClientFactory getZooKeeperClientFactory() {
return zkClientFactory;
}

public BookKeeperClientFactory getBookKeeperClientFactory() {
public BookKeeperClientFactory newBookKeeperClientFactory() {
return new BookKeeperClientFactoryImpl();
}

public BookKeeperClientFactory getBookKeeperClientFactory() {
return bkClientFactory;
}

protected synchronized ScheduledExecutorService getCompactorExecutor() {
if (this.compactorExecutor == null) {
compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ protected PulsarService startBroker(ServiceConfiguration conf) throws Exception
protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
doReturn(mockBookKeeperClientFactory).when(pulsar).getBookKeeperClientFactory();
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();

Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ private void setupEnv(boolean enableFilter, String minApiVersion, boolean allowU
config.setZookeeperServers("localhost:2181");
pulsar = spy(new PulsarService(config));
doReturn(new MockedZooKeeperClientFactoryImpl()).when(pulsar).getZooKeeperClientFactory();
doReturn(new MockedBookKeeperClientFactory()).when(pulsar).getBookKeeperClientFactory();
doReturn(new MockedBookKeeperClientFactory()).when(pulsar).newBookKeeperClientFactory();
pulsar.start();

try {
Expand Down

0 comments on commit 655b7c0

Please sign in to comment.