Skip to content

Commit

Permalink
Setup pulsar cluster with MetadataStore (apache#10600)
Browse files Browse the repository at this point in the history
### Motivation

Refactor the pulsar cluster setup command to use the new `MetadataStore` API.

### Modifications

- Refactored `PulsarClusterMetadataSetup` command to use the new `MetadataStore` API.
- Added a new `MetadataStoreLifecycle` interface for the `MetadataStore` implementations to do initialization as needed.
  - For zookeeper, it creates the root node in chroot mode
  • Loading branch information
fantapsody authored May 17, 2021
1 parent a6aed55 commit 76ae7b1
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.conf.ServerConfiguration;
Expand All @@ -41,6 +43,12 @@
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
Expand All @@ -49,7 +57,6 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -143,6 +150,21 @@ private static void createZkNode(ZooKeeper zkc, String path,
}
}

/**
* a wrapper for creating a persistent node with store.put but ignore exception of node exists.
*/
private static void createMetadataNode(MetadataStore store, String path, byte[] data)
throws InterruptedException, ExecutionException {
try {
store.put(path, data, Optional.of(-1L)).get();
} catch (ExecutionException e) {
if (!(e.getCause() instanceof MetadataStoreException.BadVersionException)) {
throw e;
}
// Ignore
}
}

private static void initialDlogNamespaceMetadata(String configurationStore, String bkMetadataServiceUri)
throws IOException {
InternalConfigurationData internalConf = new InternalConfigurationData(
Expand Down Expand Up @@ -195,15 +217,16 @@ public static void main(String[] args) throws Exception {
log.info("Setting up cluster {} with zk={} configuration-store={}", arguments.cluster, arguments.zookeeper,
arguments.configurationStore);

ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
MetadataStoreExtended localStore = initMetadataStore(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
MetadataStoreExtended configStore = initMetadataStore(arguments.configurationStore,
arguments.zkSessionTimeoutMillis);

// Format BookKeeper ledger storage metadata
ServerConfiguration bkConf = new ServerConfiguration();
if (arguments.existingBkMetadataServiceUri == null && arguments.bookieMetadataServiceUri == null) {
bkConf.setZkServers(arguments.zookeeper);
bkConf.setZkTimeout(arguments.zkSessionTimeoutMillis);
if (localZk.exists("/ledgers", false) == null // only format if /ledgers doesn't exist
if (!localStore.exists("/ledgers").get() // only format if /ledgers doesn't exist
&& !BookKeeperAdmin.format(bkConf, false /* interactive */, false /* force */)) {
throw new IOException("Failed to initialize BookKeeper metadata");
}
Expand All @@ -227,56 +250,49 @@ public static void main(String[] args) throws Exception {
initializer.initializeCluster(bkMetadataServiceUri.getUri(), arguments.numStreamStorageContainers);
}

if (localZk.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) == null) {
createZkNode(localZk, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
"{}".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
if (!localStore.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).get()) {
createMetadataNode(localStore, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes());
}

createZkNode(localZk, "/managed-ledgers", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
createMetadataNode(localStore, "/managed-ledgers", new byte[0]);

createZkNode(localZk, "/namespace", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
createMetadataNode(localStore, "/namespace", new byte[0]);

createZkNode(configStoreZk, POLICIES_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
createMetadataNode(configStore, POLICIES_ROOT, new byte[0]);

createZkNode(configStoreZk, "/admin/clusters", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
createMetadataNode(configStore, "/admin/clusters", new byte[0]);

ClusterData clusterData = new ClusterData(arguments.clusterWebServiceUrl, arguments.clusterWebServiceUrlTls,
arguments.clusterBrokerServiceUrl, arguments.clusterBrokerServiceUrlTls);
byte[] clusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData);

createZkNode(configStoreZk, "/admin/clusters/" + arguments.cluster, clusterDataJson,
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
createMetadataNode(configStore, "/admin/clusters/" + arguments.cluster, clusterDataJson);

// Create marker for "global" cluster
ClusterData globalClusterData = new ClusterData(null, null);
byte[] globalClusterDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(globalClusterData);

createZkNode(configStoreZk, "/admin/clusters/global", globalClusterDataJson, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
createMetadataNode(configStore, "/admin/clusters/global", globalClusterDataJson);

// Create public tenant, whitelisted to use the this same cluster, along with other clusters
createTenantIfAbsent(configStoreZk, TopicName.PUBLIC_TENANT, arguments.cluster);
createTenantIfAbsent(configStore, TopicName.PUBLIC_TENANT, arguments.cluster);

// Create system tenant
createTenantIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);
createTenantIfAbsent(configStore, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);

// Create default namespace
createNamespaceIfAbsent(configStoreZk, NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE),
createNamespaceIfAbsent(configStore, NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE),
arguments.cluster);

// Create system namespace
createNamespaceIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);
createNamespaceIfAbsent(configStore, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);

// Create transaction coordinator assign partitioned topic
createPartitionedTopic(configStoreZk, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
arguments.numTransactionCoordinators);

localZk.close();
configStoreZk.close();
localStore.close();
configStore.close();

log.info("Cluster metadata for '{}' setup correctly", arguments.cluster);
}
Expand Down Expand Up @@ -308,6 +324,32 @@ static void createTenantIfAbsent(ZooKeeper configStoreZk, String tenant, String
}
}

static void createTenantIfAbsent(MetadataStore configStore, String tenant, String cluster) throws IOException,
InterruptedException, ExecutionException {

String tenantPath = POLICIES_ROOT + "/" + tenant;

Optional<GetResult> getResult = configStore.get(tenantPath).get();
if (!getResult.isPresent()) {
TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(cluster));

createMetadataNode(configStore, tenantPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant));
} else {
// Update existing public tenant with new cluster
byte[] content = getResult.get().getValue();
TenantInfo publicTenant = ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);

// Only update z-node if the list of clusters should be modified
if (!publicTenant.getAllowedClusters().contains(cluster)) {
publicTenant.getAllowedClusters().add(cluster);

configStore.put(tenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
Optional.of(getResult.get().getStat().getVersion()));
}
}
}

static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName namespaceName, String cluster)
throws KeeperException, InterruptedException, IOException {
String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
Expand Down Expand Up @@ -338,6 +380,32 @@ static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName names
}
}

static void createNamespaceIfAbsent(MetadataStore configStore, NamespaceName namespaceName, String cluster)
throws InterruptedException, IOException, ExecutionException {
String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
Policies policies;
Optional<GetResult> getResult = configStore.get(namespacePath).get();
if (!getResult.isPresent()) {
policies = new Policies();
policies.bundles = getBundles(16);
policies.replication_clusters = Collections.singleton(cluster);

createMetadataNode(configStore, namespacePath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies));
} else {
byte[] content = getResult.get().getValue();
policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);

// Only update z-node if the list of clusters should be modified
if (!policies.replication_clusters.contains(cluster)) {
policies.replication_clusters.add(cluster);

configStore.put(namespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
Optional.of(getResult.get().getStat().getVersion()));
}
}
}

static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName, int numPartitions)
throws KeeperException, InterruptedException, IOException {
String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
Expand Down Expand Up @@ -367,6 +435,29 @@ static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName,
}
}

static void createPartitionedTopic(MetadataStore configStore, TopicName topicName, int numPartitions)
throws InterruptedException, IOException, ExecutionException {
String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
Optional<GetResult> getResult = configStore.get(partitionedTopicPath).get();
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions);
if (!getResult.isPresent()) {
createMetadataNode(configStore, partitionedTopicPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata));
} else {
byte[] content = getResult.get().getValue();
PartitionedTopicMetadata existsMeta =
ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);

// Only update z-node if the partitions should be modified
if (existsMeta.partitions < numPartitions) {
configStore.put(
partitionedTopicPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
Optional.of(getResult.get().getStat().getVersion()));
}
}
}

public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exception {
ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
int chrootIndex = connection.indexOf("/");
Expand All @@ -386,5 +477,15 @@ public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exc
return zkConnect;
}

public static MetadataStoreExtended initMetadataStore(String connection, int sessionTimeout) throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeout)
.build());
if (store instanceof MetadataStoreLifecycle) {
((MetadataStoreLifecycle) store).initializeCluster().get();
}
return store;
}

private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataSetup.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
*/
package org.apache.pulsar.broker.zookeeper;

import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashSet;
import java.util.SortedMap;
import java.util.TreeMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.pulsar.PulsarClusterMetadataSetup;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
Expand All @@ -32,6 +39,8 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

Expand All @@ -53,8 +62,38 @@ public void testReSetupClusterMetadata() throws Exception {
"--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
};
PulsarClusterMetadataSetup.main(args);
SortedMap<String, String> data1 = localZkS.dumpData();
PulsarClusterMetadataSetup.main(args);
SortedMap<String, String> data2 = localZkS.dumpData();
assertEquals(data1, data2);
PulsarClusterMetadataSetup.main(args);
SortedMap<String, String> data3 = localZkS.dumpData();
assertEquals(data1, data3);
}

@Test
public void testSetupClusterInChrootMode() throws Exception {
HashSet<String> firstLevelNodes = new HashSet<>(Arrays.asList(
"admin", "bookies", "ledgers", "managed-ledgers", "namespace", "pulsar", "stream"
));
String rootPath = "/test-prefix";
String[] args = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort() + rootPath,
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort() + rootPath,
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
};
PulsarClusterMetadataSetup.main(args);

try (ZooKeeper zk = ZooKeeperClient.newBuilder()
.connectString("127.0.0.1:" + localZkS.getZookeeperPort())
.build()) {
assertNotNull(zk.exists(rootPath, false));
assertEquals(new HashSet<>(zk.getChildren(rootPath, false)), firstLevelNodes);
}
}

@Test
Expand Down Expand Up @@ -168,5 +207,31 @@ public void close() throws IOException {
public int getZookeeperPort() {
return serverFactory.getLocalPort();
}

public SortedMap<String, String> dumpData() throws IOException, InterruptedException, KeeperException {
SortedMap<String, String> data = new TreeMap<>();
try (ZooKeeper zk = ZooKeeperClient.newBuilder()
.connectString("127.0.0.1:" + getZookeeperPort())
.sessionTimeoutMs(20000)
.build()) {
for (String child : zk.getChildren("/", false)) {
if ("zookeeper".equals(child)) {
continue;
}
dumpPath(zk, "/" + child, data);
}
}
return data;
}

private void dumpPath(ZooKeeper zk, String path, SortedMap<String, String> dataMap)
throws InterruptedException, KeeperException {
dataMap.put(path, new String(zk.getData(path, false, null), Charset.defaultCharset()));
for (String child : zk.getChildren(path, false)) {
dumpPath(zk, path + "/" + child, dataMap);
}
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.api;

import java.util.concurrent.CompletableFuture;

/**
* Extension of the {@link MetadataStore} interface that supports lifecycle operation methods which might not
* be supported by all implementations.
*/
public interface MetadataStoreLifecycle {
/**
* Initialize the metadata store cluster if needed.
*
* For example, if the backend metadata store is a zookeeper cluster and the pulsar cluster is configured to
* access the zookeeper cluster in the chroot mode, then this method could be used to initialize the root node
* during pulsar cluster metadata setup.
*
* @return a future to track the async request.
*/
CompletableFuture<Void> initializeCluster();
}
Loading

0 comments on commit 76ae7b1

Please sign in to comment.