Skip to content

Commit

Permalink
[pulsar-broker] Fix deadlock: add zk-operation timeout for blocking c…
Browse files Browse the repository at this point in the history
…all on zk-cache (apache#3633)

fix: compilation

fix test

fix test
  • Loading branch information
rdhabalia authored and merlimat committed Mar 19, 2019
1 parent b50760c commit a057a14
Show file tree
Hide file tree
Showing 29 changed files with 110 additions and 61 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ failureDomainsEnabled=false
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000

# ZooKeeper operation timeout in seconds
zooKeeperOperationTimeoutSeconds=30

# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ failureDomainsEnabled=false
# Zookeeper session timeout in milliseconds
zooKeeperSessionTimeoutMillis=30000

# ZooKeeper operation timeout in seconds
zooKeeperOperationTimeoutSeconds=30

# Time to wait for broker graceful shutdown. After this time elapses, the process will be killed
brokerShutdownTimeoutMs=60000

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "ZooKeeper session timeout in milliseconds"
)
private long zooKeeperSessionTimeoutMillis = 30000;
@FieldContext(
category = CATEGORY_SERVER,
doc = "ZooKeeper operation timeout in seconds"
)
private int zooKeeperOperationTimeoutSeconds = 30;
@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.function.Function;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

/**
* Authorization service that manages pluggable authorization provider and authorize requests accordingly.
Expand Down Expand Up @@ -215,9 +214,11 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
public boolean canProduce(TopicName topicName, String role, AuthenticationDataSource authenticationData)
throws Exception {
try {
return canProduceAsync(topicName, role, authenticationData).get(cacheTimeOutInSec, SECONDS);
return canProduceAsync(topicName, role, authenticationData).get(conf.getZooKeeperOperationTimeoutSeconds(),
SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, topicName);
log.warn("Time-out {} sec while checking authorization on {} ", conf.getZooKeeperOperationTimeoutSeconds(),
topicName);
throw e;
} catch (Exception e) {
log.warn("Producer-client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
Expand All @@ -229,13 +230,15 @@ public boolean canProduce(TopicName topicName, String role, AuthenticationDataSo
public boolean canConsume(TopicName topicName, String role, AuthenticationDataSource authenticationData,
String subscription) throws Exception {
try {
return canConsumeAsync(topicName, role, authenticationData, subscription).get(cacheTimeOutInSec, SECONDS);
return canConsumeAsync(topicName, role, authenticationData, subscription)
.get(conf.getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, topicName);
log.warn("Time-out {} sec while checking authorization on {} ", conf.getZooKeeperOperationTimeoutSeconds(),
topicName);
throw e;
} catch (Exception e) {
log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", role,
topicName, e.getMessage());
log.warn("Consumer-client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
e.getMessage());
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
bkConf.setProperty(RackawareEnsemblePlacementPolicy.REPP_DNS_RESOLVER_CLASS,
ZkBookieRackAffinityMapping.class.getName());

ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
};
if (!rackawarePolicyZkCache.compareAndSet(null, zkc)) {
zkc.stop();
Expand All @@ -91,7 +91,7 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
ZooKeeperCache zkc = new ZooKeeperCache(zkClient) {
ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
};

if (!clientIsolationZkCache.compareAndSet(null, zkc)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,9 +538,11 @@ private void startZkCacheService() throws PulsarServerException {

LOG.info("starting configuration cache service");

this.localZkCache = new LocalZooKeeperCache(getZkClient(), getOrderedExecutor());
this.localZkCache = new LocalZooKeeperCache(getZkClient(), config.getZooKeeperOperationTimeoutSeconds(),
getOrderedExecutor());
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getConfigurationStoreServers(),
(int) config.getZooKeeperSessionTimeoutMillis(),
config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(),
getOrderedExecutor(), this.cacheExecutor);
try {
this.globalZkCache.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
Expand Down Expand Up @@ -762,7 +761,8 @@ private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBun

if (!policies.isPresent()) {
// if policies is not present into localZk then create new policies
this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS);
this.pulsar.getLocalZkCacheService().createPolicies(path, false)
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}

long version = nsBundles.getVersion();
Expand Down Expand Up @@ -828,17 +828,20 @@ private boolean isTopicOwned(TopicName topicName) throws Exception {
}

public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
ownershipCache.removeOwnership(getFullBundle(nsName)).get(cacheTimeOutInSec, SECONDS);
ownershipCache.removeOwnership(getFullBundle(nsName))
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
bundleFactory.invalidateBundleCache(nsName);
}

public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
ownershipCache.removeOwnership(nsBundle).get(cacheTimeOutInSec, SECONDS);
ownershipCache.removeOwnership(nsBundle).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(),
SECONDS);
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
}

public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get(cacheTimeOutInSec, SECONDS);
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData))
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
bundleFactory.invalidateBundleCache(nsName);
}

Expand Down Expand Up @@ -896,7 +899,7 @@ public List<String> getListOfNonPersistentTopics(NamespaceName namespaceName) th
ClusterData peerClusterData;
try {
peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
.get(cacheTimeOutInSec, SECONDS);
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException("Failed to contact peer replication cluster.", e);
}
Expand Down Expand Up @@ -972,7 +975,7 @@ public PulsarClientImpl getNamespaceClient(ClusterData cluster) {

public Optional<NamespaceEphemeralData> getOwner(NamespaceBundle bundle) throws Exception {
// if there is no znode for the service unit, it is not owned by any broker
return getOwnerAsync(bundle).get(cacheTimeOutInSec, SECONDS);
return getOwnerAsync(bundle).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}

public CompletableFuture<Optional<NamespaceEphemeralData>> getOwnerAsync(NamespaceBundle bundle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -189,7 +188,7 @@ public static Optional<Policies> getPolicies(BrokerService brokerService, String
Optional<Policies> policies = Optional.empty();
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache().getAsync(path)
.get(cacheTimeOutInSec, SECONDS);
.get(brokerService.pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
} catch (Exception e) {
log.warn("Failed to get message-rate for {} ", topicName, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

public class SubscribeRateLimiter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

import java.net.MalformedURLException;
import java.net.URI;
Expand Down Expand Up @@ -612,9 +611,10 @@ protected void validateTopicOwnership(TopicName topicName, boolean authoritative
* @throws Exception
*/
protected void validateGlobalNamespaceOwnership(NamespaceName namespace) {
int timeout = pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds();
try {
ClusterData peerClusterData = checkLocalOrGetPeerReplicationCluster(pulsar(), namespace)
.get(cacheTimeOutInSec, SECONDS);
.get(timeout, SECONDS);
// if peer-cluster-data is present it means namespace is owned by that peer-cluster and request should be
// redirect to the peer-cluster
if (peerClusterData != null) {
Expand All @@ -627,7 +627,7 @@ protected void validateGlobalNamespaceOwnership(NamespaceName namespace) {
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
} catch (InterruptedException e) {
log.warn("Time-out {} sec while validating policy on {} ", cacheTimeOutInSec, namespace);
log.warn("Time-out {} sec while validating policy on {} ", timeout, namespace);
throw new RestException(Status.SERVICE_UNAVAILABLE, String.format(
"Failed to validate global cluster configuration : ns=%s emsg=%s", namespace, e.getMessage()));
} catch (WebApplicationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ResourceQuotaCacheTest {
public void setup() throws Exception {
pulsar = mock(PulsarService.class);
executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 30, executor);
localCache = new LocalZooKeeperCacheService(zkCache, null);

// set mock pulsar localzkcache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void setup() throws Exception {
pulsar = mock(PulsarService.class);
config = mock(ServiceConfiguration.class);
executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("test").build();
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), 30, executor);
localCache = spy(new LocalZooKeeperCacheService(zkCache, null));
ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.invocation.InvocationOnMock;
Expand Down Expand Up @@ -116,6 +117,10 @@ public void setup() throws Exception {
doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
.when(pulsar).getBookKeeperClient();

ZooKeeperCache cache = mock(ZooKeeperCache.class);
doReturn(30).when(cache).getZkOperationTimeoutSeconds();
doReturn(cache).when(pulsar).getLocalZkCache();

configCacheService = mock(ConfigurationCacheService.class);
@SuppressWarnings("unchecked")
ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -155,6 +156,10 @@ public void setup() throws Exception {
doReturn(createMockBookKeeper(mockZk, pulsar.getOrderedExecutor().chooseThread(0)))
.when(pulsar).getBookKeeperClient();

ZooKeeperCache cache = mock(ZooKeeperCache.class);
doReturn(30).when(cache).getZkOperationTimeoutSeconds();
doReturn(cache).when(pulsar).getLocalZkCache();

configCacheService = mock(ConfigurationCacheService.class);
@SuppressWarnings("unchecked")
ZooKeeperDataCache<Policies> zkDataCache = mock(ZooKeeperDataCache.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Mockito;
Expand Down Expand Up @@ -157,6 +158,9 @@ public void setup() throws Exception {

mlFactoryMock = mock(ManagedLedgerFactory.class);
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
ZooKeeperCache cache = mock(ZooKeeperCache.class);
doReturn(30).when(cache).getZkOperationTimeoutSeconds();
doReturn(cache).when(pulsar).getLocalZkCache();

ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.common.util.OrderedScheduler;
Expand Down Expand Up @@ -72,6 +73,7 @@ public BrokerDiscoveryProvider(ServiceConfig config, ZooKeeperClientFactory zkCl
localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
globalZkCache.start();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down Expand Up @@ -68,7 +69,8 @@ public ZookeeperCacheLoader(ZooKeeperClientFactory zkClientFactory, String zooke
log.error("Shutting down ZK sessions: {}", exitCode);
});

this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(), this.orderedExecutor);
this.localZkCache = new LocalZooKeeperCache(localZkConnectionSvc.getLocalZooKeeper(),
(int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);
localZkConnectionSvc.start(exitCode -> {
try {
localZkCache.getZooKeeper().close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.bookkeeper.common.util.OrderedScheduler;
Expand Down Expand Up @@ -71,6 +72,7 @@ public BrokerDiscoveryProvider(ProxyConfiguration config, ZooKeeperClientFactory
localZkCache = new ZookeeperCacheLoader(zkClientFactory, config.getZookeeperServers(),
config.getZookeeperSessionTimeoutMs());
globalZkCache = new GlobalZooKeeperCache(zkClientFactory, config.getZookeeperSessionTimeoutMs(),
(int) TimeUnit.MILLISECONDS.toSeconds(config.getZookeeperSessionTimeoutMs()),
config.getConfigurationStoreServers(), orderedExecutor, scheduledExecutorScheduler);
globalZkCache.start();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand Down Expand Up @@ -65,7 +66,8 @@ public class ZookeeperCacheLoader implements Closeable {
*/
public ZookeeperCacheLoader(ZooKeeperClientFactory factory, String zookeeperServers, int zookeeperSessionTimeoutMs) throws Exception {
this.zkClient = factory.create(zookeeperServers, SessionType.AllowReadOnly, zookeeperSessionTimeoutMs).get();
this.localZkCache = new LocalZooKeeperCache(zkClient, this.orderedExecutor);
this.localZkCache = new LocalZooKeeperCache(zkClient,
(int) TimeUnit.MILLISECONDS.toSeconds(zookeeperSessionTimeoutMs), this.orderedExecutor);

this.brokerInfo = new ZooKeeperDataCache<LoadManagerReport>(localZkCache) {
@Override
Expand Down
Loading

0 comments on commit a057a14

Please sign in to comment.