From a95809856ff0bab415ecb8df4cb9454e610e22cb Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Fri, 20 Dec 2019 13:09:13 +0800 Subject: [PATCH] fix 5825: make PulsarClusterMetadataSetup idempotent (#5879) Fixes #5825 ### Motivation PulsarClusterMetadataSetup runs on the second time will failed because some zk nodes already exists. This change will fix this issue. ### Modifications - catch and ignore exception for exists zk nodes. - add test for it. --- .../pulsar/PulsarClusterMetadataSetup.java | 112 +++++++----------- .../zookeeper/ClusterMetadataSetupTest.java | 60 ++++++++++ .../zookeeper/ZooKeeperClientAspectJTest.java | 2 +- 3 files changed, 107 insertions(+), 67 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index 889a2201a747d..0033624152ed8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -53,6 +53,7 @@ import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,6 +114,20 @@ private static class Arguments { private boolean help = false; } + /** + * a wrapper for ZkUtils.createFullPathOptimistic but ignore exception of node exists + */ + private static void createZkNode(ZooKeeper zkc, String path, + byte[] data, final List acl, final CreateMode createMode) + throws KeeperException, InterruptedException { + + try { + ZkUtils.createFullPathOptimistic(zkc, path, data, acl, createMode); + } catch (NodeExistsException e) { + // Ignore + } + } + public static void main(String[] args) throws Exception { Arguments arguments = new Arguments(); JCommander jcommander = new JCommander(); @@ -172,53 +187,34 @@ public static void main(String[] args) throws Exception { } if (localZk.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) == null) { - try { - localZk.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes(), Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore - } + createZkNode(localZk, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, + "{}".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } - try { - localZk.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore - } + createZkNode(localZk, "/managed-ledgers", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - localZk.create("/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + createZkNode(localZk, "/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - try { - ZkUtils.createFullPathOptimistic(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore - } + createZkNode(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); - try { - ZkUtils.createFullPathOptimistic(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore - } + createZkNode(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); ClusterData clusterData = new ClusterData(arguments.clusterWebServiceUrl, arguments.clusterWebServiceUrlTls, arguments.clusterBrokerServiceUrl, arguments.clusterBrokerServiceUrlTls); byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData); - configStoreZk.create("/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + createZkNode(configStoreZk,"/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); // Create marker for "global" cluster ClusterData globalClusterData = new ClusterData(null, null); byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData); - try { - configStoreZk.create("/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore - } + createZkNode(configStoreZk, "/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); // Create public tenant, whitelisted to use the this same cluster, along with other clusters createTenantIfAbsent(configStoreZk, TopicName.PUBLIC_TENANT, arguments.cluster); @@ -248,13 +244,9 @@ static void createTenantIfAbsent(ZooKeeper configStoreZk, String tenant, String if (stat == null) { TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(cluster)); - try { - ZkUtils.createFullPathOptimistic(configStoreZk, tenantPath, - ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant), - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore - } + createZkNode(configStoreZk, tenantPath, + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } else { // Update existing public tenant with new cluster byte[] content = configStoreZk.getData(tenantPath, false, null); @@ -280,16 +272,12 @@ static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName names policies.bundles = getBundles(16); policies.replication_clusters = Collections.singleton(cluster); - try { - ZkUtils.createFullPathOptimistic( - configStoreZk, - namespacePath, - ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore - } + createZkNode( + configStoreZk, + namespacePath, + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); } else { byte[] content = configStoreZk.getData(namespacePath, false, null); policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class); @@ -309,17 +297,13 @@ static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName, Stat stat = configStoreZk.exists(partitionedTopicPath, false); PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions); if (stat == null) { - try { - ZkUtils.createFullPathOptimistic( - configStoreZk, - partitionedTopicPath, - ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata), - ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT - ); - } catch (NodeExistsException e) { - // Ignore - } + createZkNode( + configStoreZk, + partitionedTopicPath, + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata), + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT + ); } else { byte[] content = configStoreZk.getData(partitionedTopicPath, false, null); PartitionedTopicMetadata existsMeta = ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class); @@ -344,12 +328,8 @@ public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exc ZooKeeper chrootZk = zkfactory.create( zkConnectForChrootCreation, SessionType.ReadWrite, sessionTimeout).get(); if (chrootZk.exists(chrootPath, false) == null) { - try { - ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - } catch (NodeExistsException e) { - // Ignore - } + createZkNode(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); log.info("Created zookeeper chroot path {} successfully", chrootPath); } chrootZk.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java new file mode 100644 index 0000000000000..c48f231dc9c71 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.zookeeper; + +import org.apache.bookkeeper.test.PortManager; +import org.apache.pulsar.PulsarClusterMetadataSetup; +import org.apache.pulsar.broker.zookeeper.ZooKeeperClientAspectJTest.ZookeeperServerTest; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class ClusterMetadataSetupTest { + private ZookeeperServerTest localZkS; + private final int LOCAL_ZOOKEEPER_PORT = PortManager.nextFreePort(); + + // test SetupClusterMetadata several times, all should be suc + @Test + public void testReSetupClusterMetadata() throws Exception { + String[] args = { + "--cluster", "testReSetupClusterMetadata-cluster", + "--zookeeper", "127.0.0.1:" + LOCAL_ZOOKEEPER_PORT, + "--configuration-store", "127.0.0.1:" + LOCAL_ZOOKEEPER_PORT, + "--web-service-url", "http://127.0.0.1:8080", + "--web-service-url-tls", "https://127.0.0.1:8443", + "--broker-service-url", "pulsar://127.0.0.1:6650", + "--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651" + }; + PulsarClusterMetadataSetup.main(args); + PulsarClusterMetadataSetup.main(args); + PulsarClusterMetadataSetup.main(args); + } + + @BeforeMethod + void setup() throws Exception { + localZkS = new ZookeeperServerTest(LOCAL_ZOOKEEPER_PORT); + localZkS.start(); + } + + @AfterMethod + void teardown() throws Exception { + localZkS.close(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java index 4abfa13a37b55..4d52ff81a0d21 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java @@ -294,7 +294,7 @@ private Metrics getMetric(PulsarService pulsar, String dimension) { return null; } - class ZookeeperServerTest implements Closeable { + static class ZookeeperServerTest implements Closeable { private final File zkTmpDir; private ZooKeeperServer zks; private NIOServerCnxnFactory serverFactory;