Skip to content

Commit

Permalink
fix 5825: make PulsarClusterMetadataSetup idempotent (apache#5879)
Browse files Browse the repository at this point in the history
Fixes apache#5825

### Motivation

PulsarClusterMetadataSetup runs on the second time will failed because some zk nodes already exists. This change will fix this issue.

### Modifications

- catch and ignore exception for exists zk nodes.
- add test for it.
  • Loading branch information
jiazhai authored and sijie committed Dec 20, 2019
1 parent b9aa4ce commit a958098
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -113,6 +114,20 @@ private static class Arguments {
private boolean help = false;
}

/**
* a wrapper for ZkUtils.createFullPathOptimistic but ignore exception of node exists
*/
private static void createZkNode(ZooKeeper zkc, String path,
byte[] data, final List<ACL> acl, final CreateMode createMode)
throws KeeperException, InterruptedException {

try {
ZkUtils.createFullPathOptimistic(zkc, path, data, acl, createMode);
} catch (NodeExistsException e) {
// Ignore
}
}

public static void main(String[] args) throws Exception {
Arguments arguments = new Arguments();
JCommander jcommander = new JCommander();
Expand Down Expand Up @@ -172,53 +187,34 @@ public static void main(String[] args) throws Exception {
}

if (localZk.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) == null) {
try {
localZk.create(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
createZkNode(localZk, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
"{}".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

try {
localZk.create("/managed-ledgers", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
createZkNode(localZk, "/managed-ledgers", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

localZk.create("/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
createZkNode(localZk, "/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

try {
ZkUtils.createFullPathOptimistic(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
createZkNode(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);

try {
ZkUtils.createFullPathOptimistic(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
createZkNode(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);

ClusterData clusterData = new ClusterData(arguments.clusterWebServiceUrl, arguments.clusterWebServiceUrlTls,
arguments.clusterBrokerServiceUrl, arguments.clusterBrokerServiceUrlTls);
byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData);

configStoreZk.create("/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
createZkNode(configStoreZk,"/admin/clusters/" + arguments.cluster, clusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);

// Create marker for "global" cluster
ClusterData globalClusterData = new ClusterData(null, null);
byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData);

try {
configStoreZk.create("/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
createZkNode(configStoreZk, "/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);

// Create public tenant, whitelisted to use the this same cluster, along with other clusters
createTenantIfAbsent(configStoreZk, TopicName.PUBLIC_TENANT, arguments.cluster);
Expand Down Expand Up @@ -248,13 +244,9 @@ static void createTenantIfAbsent(ZooKeeper configStoreZk, String tenant, String
if (stat == null) {
TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(cluster));

try {
ZkUtils.createFullPathOptimistic(configStoreZk, tenantPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
createZkNode(configStoreZk, tenantPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
// Update existing public tenant with new cluster
byte[] content = configStoreZk.getData(tenantPath, false, null);
Expand All @@ -280,16 +272,12 @@ static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName names
policies.bundles = getBundles(16);
policies.replication_clusters = Collections.singleton(cluster);

try {
ZkUtils.createFullPathOptimistic(
configStoreZk,
namespacePath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
createZkNode(
configStoreZk,
namespacePath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
byte[] content = configStoreZk.getData(namespacePath, false, null);
policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);
Expand All @@ -309,17 +297,13 @@ static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName,
Stat stat = configStoreZk.exists(partitionedTopicPath, false);
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions);
if (stat == null) {
try {
ZkUtils.createFullPathOptimistic(
configStoreZk,
partitionedTopicPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT
);
} catch (NodeExistsException e) {
// Ignore
}
createZkNode(
configStoreZk,
partitionedTopicPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT
);
} else {
byte[] content = configStoreZk.getData(partitionedTopicPath, false, null);
PartitionedTopicMetadata existsMeta = ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);
Expand All @@ -344,12 +328,8 @@ public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exc
ZooKeeper chrootZk = zkfactory.create(
zkConnectForChrootCreation, SessionType.ReadWrite, sessionTimeout).get();
if (chrootZk.exists(chrootPath, false) == null) {
try {
ZkUtils.createFullPathOptimistic(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
createZkNode(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
log.info("Created zookeeper chroot path {} successfully", chrootPath);
}
chrootZk.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.zookeeper;

import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.broker.zookeeper.ZooKeeperClientAspectJTest.ZookeeperServerTest;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ClusterMetadataSetupTest {
private ZookeeperServerTest localZkS;
private final int LOCAL_ZOOKEEPER_PORT = PortManager.nextFreePort();

// test SetupClusterMetadata several times, all should be suc
@Test
public void testReSetupClusterMetadata() throws Exception {
String[] args = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", "127.0.0.1:" + LOCAL_ZOOKEEPER_PORT,
"--configuration-store", "127.0.0.1:" + LOCAL_ZOOKEEPER_PORT,
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
};
PulsarClusterMetadataSetup.main(args);
PulsarClusterMetadataSetup.main(args);
PulsarClusterMetadataSetup.main(args);
}

@BeforeMethod
void setup() throws Exception {
localZkS = new ZookeeperServerTest(LOCAL_ZOOKEEPER_PORT);
localZkS.start();
}

@AfterMethod
void teardown() throws Exception {
localZkS.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ private Metrics getMetric(PulsarService pulsar, String dimension) {
return null;
}

class ZookeeperServerTest implements Closeable {
static class ZookeeperServerTest implements Closeable {
private final File zkTmpDir;
private ZooKeeperServer zks;
private NIOServerCnxnFactory serverFactory;
Expand Down

0 comments on commit a958098

Please sign in to comment.