diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index ba5383786a96c..947ccb66de52b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -873,18 +873,24 @@ public void start() throws PulsarServerException { final String timeAverageZPath = TIME_AVERAGE_BROKER_ZPATH + "/" + lookupServiceAddress; updateLocalBrokerData(); try { - ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(), + if (!org.apache.pulsar.zookeeper.ZkUtils.checkNodeAndWaitExpired( + zkClient, brokerZnodePath, + pulsar.getConfig().getZooKeeperSessionTimeoutMillis())) { + ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } catch (KeeperException.NodeExistsException e) { - long ownerZkSessionId = getBrokerZnodeOwner(); - if (ownerZkSessionId != 0 && ownerZkSessionId != zkClient.getSessionId()) { - log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath, - ownerZkSessionId); - throw new PulsarServerException( - "Broker-znode owned by different zk-session " + ownerZkSessionId); + } else { + // Node may already be created by another load manager: in this case update the data. + zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1); } - // Node may already be created by another load manager: in this case update the data. - zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1); + } catch (KeeperException.NodeExistsException e) { + log.error("Broker znode - [{}] is own by different zookeeper-session", brokerZnodePath); + throw new PulsarServerException( + "Broker znode - [" + brokerZnodePath + "] is owned by different zk-session"); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + // Catching exception here to print the right error message + log.error("Interrupted at creating znode - [{}] for load balance on zookeeper ", brokerZnodePath, ie); + throw ie; } catch (Exception e) { // Catching exception here to print the right error message log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e); @@ -1037,17 +1043,6 @@ private void deleteBundleDataFromZookeeper(String bundle) { } } - private long getBrokerZnodeOwner() { - try { - Stat stat = new Stat(); - zkClient.getData(brokerZnodePath, false, stat); - return stat.getEphemeralOwner(); - } catch (Exception e) { - log.warn("Failed to get stat of {}", brokerZnodePath, e); - } - return 0; - } - private void refreshBrokerToFailureDomainMap() { if (!pulsar.getConfiguration().isFailureDomainsEnabled()) { return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 3e6c32a2009a0..5469dee6ad896 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -301,20 +301,26 @@ public void start() throws PulsarServerException { loadReportJson = ObjectMapperFactory.getThreadLocal().writeValueAsString(loadReport); } try { - ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath, + if (!org.apache.pulsar.zookeeper.ZkUtils.checkNodeAndWaitExpired( + pulsar.getZkClient(), brokerZnodePath, + pulsar.getConfig().getZooKeeperSessionTimeoutMillis())) { + ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - } catch (KeeperException.NodeExistsException e) { - long ownerZkSessionId = getBrokerZnodeOwner(); - if (ownerZkSessionId != 0 && ownerZkSessionId != pulsar.getZkClient().getSessionId()) { - log.error("Broker znode - [{}] is own by different zookeeper-ssession {} ", brokerZnodePath, - ownerZkSessionId); - throw new PulsarServerException("Broker-znode owned by different zk-session " + ownerZkSessionId); - } - // Node may already be created by another load manager: in this case update the data. - if (loadReport != null) { - pulsar.getZkClient().setData(brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), -1); + } else { + // Node may already be created by another load manager: in this case update the data. + if (loadReport != null) { + pulsar.getZkClient().setData(brokerZnodePath, loadReportJson.getBytes(Charsets.UTF_8), -1); + } } - + } catch (KeeperException.NodeExistsException e) { + log.error("Broker znode - [{}] is own by different zookeeper-session", brokerZnodePath); + throw new PulsarServerException( + "Broker znode - [" + brokerZnodePath + "] is owned by different zk-session"); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + // Catching exception here to print the right error message + log.error("Interrupted at creating znode - [{}] for load balance on zookeeper ", brokerZnodePath, ie); + throw ie; } catch (Exception e) { // Catching excption here to print the right error message log.error("Unable to create znode - [{}] for load balance on zookeeper ", brokerZnodePath, e); diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java new file mode 100644 index 0000000000000..d9e8ad4e11049 --- /dev/null +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkUtils.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.zookeeper; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.NodeExistsException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ZooKeeper utils. + */ +public final class ZkUtils { + + private static final Logger log = LoggerFactory.getLogger(ZkUtils.class); + + /** + * Check if the provided path exists or not and wait it expired if possible. + * + * @param zk the zookeeper client instance + * @param path the zookeeper path + * @param sessionTimeoutMs session timeout in milliseconds + * @return true if path exists, otherwise return false + * @throws KeeperException when failed to access zookeeper + * @throws InterruptedException interrupted when waiting for znode to be expired + */ + public static boolean checkNodeAndWaitExpired(ZooKeeper zk, + String path, + long sessionTimeoutMs) throws KeeperException, InterruptedException { + final CountDownLatch prevNodeLatch = new CountDownLatch(1); + Watcher zkPrevNodeWatcher = watchedEvent -> { + // check for prev node deletion. + if (EventType.NodeDeleted == watchedEvent.getType()) { + prevNodeLatch.countDown(); + } + }; + Stat stat = zk.exists(path, zkPrevNodeWatcher); + if (null != stat) { + // if the ephemeral owner isn't current zookeeper client + // wait for it to be expired + if (stat.getEphemeralOwner() != zk.getSessionId()) { + log.info("Previous znode : {} still exists, so waiting {} ms for znode deletion", + path, sessionTimeoutMs); + if (!prevNodeLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS)) { + throw new NodeExistsException(path); + } else { + return false; + } + } + return true; + } else { + return false; + } + } + +}