diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 9dd065a42a7f8..35ef3c42e4e08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -80,6 +80,11 @@ private static class Arguments { "--zookeeper" }, description = "Local ZooKeeper quorum connection string", required = true) private String zookeeper; + @Parameter(names = { + "--zookeeper-session-timeout-ms" + }, description = "Local zookeeper session timeout ms") + private int zkSessionTimeoutMillis = 30000; + @Parameter(names = { "-gzk", "--global-zookeeper" }, description = "Global ZooKeeper quorum connection string", required = false, hidden = true) private String globalZookeeper; @@ -126,13 +131,16 @@ public static void main(String[] args) throws Exception { log.info("Setting up cluster {} with zk={} configuration-store ={}", arguments.cluster, arguments.zookeeper, arguments.configurationStore); ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl(); - ZooKeeper localZk = zkfactory.create(arguments.zookeeper, SessionType.ReadWrite, 30000).get(); - ZooKeeper configStoreZk = zkfactory.create(arguments.configurationStore, SessionType.ReadWrite, 30000).get(); + ZooKeeper localZk = zkfactory.create( + arguments.zookeeper, SessionType.ReadWrite, arguments.zkSessionTimeoutMillis).get(); + ZooKeeper configStoreZk = zkfactory.create( + arguments.configurationStore, SessionType.ReadWrite, arguments.zkSessionTimeoutMillis).get(); // Format BookKeeper metadata ServerConfiguration bkConf = new ServerConfiguration(); bkConf.setLedgerManagerFactoryClass(HierarchicalLedgerManagerFactory.class); bkConf.setZkServers(arguments.zookeeper); + bkConf.setZkTimeout(arguments.zkSessionTimeoutMillis); if (localZk.exists("/ledgers", false) == null // only format if /ledgers doesn't exist && !BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) { throw new IOException("Failed to initialize BookKeeper metadata"); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java index 905c21c4fe45e..deb74ac2e9c0c 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/GlobalZooKeeperCache.java @@ -65,7 +65,7 @@ public void start() throws IOException { // Initial session creation with global ZK must work try { - ZooKeeper newSession = zkFuture.get(10, TimeUnit.SECONDS); + ZooKeeper newSession = zkFuture.get(zkSessionTimeoutMillis, TimeUnit.MILLISECONDS); // Register self as a watcher to receive notification when session expires and trigger a new session to be // created newSession.register(this);