Skip to content

Commit

Permalink
[Transaction Coordinator] Bootstrap pulsar system namespace and creat…
Browse files Browse the repository at this point in the history
…e TC assign topic. (apache#5515)

### Motivation

Introduce pulsar system namespace and init transaction coordinator assign topic when init pulsar cluster.

Broker will bootstrap the system namespace if broker enable transaction coordinator.

### Verifying this change

Added new unit tests for system namespace Bootstrap
  • Loading branch information
codelipenghui authored and sijie committed Nov 1, 2019
1 parent 89a9aaa commit f4d954d
Show file tree
Hide file tree
Showing 13 changed files with 303 additions and 29 deletions.
3 changes: 3 additions & 0 deletions bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ where command is one of:
standalone Run a broker server with local bookies and local zookeeper
initialize-cluster-metadata One-time metadata initialization
initialize-transaction-coordinator-metadata One-time transaction coordinator metadata initialization
compact-topic Run compaction against a topic
zookeeper-shell Open a ZK shell client
tokens Utility to create authentication tokens
Expand Down Expand Up @@ -328,6 +329,8 @@ elif [ $COMMAND == "standalone" ]; then
exec $JAVA $OPTS $ASPECTJ_AGENT ${ZK_OPTS} -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
elif [ $COMMAND == "initialize-cluster-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
elif [ $COMMAND == "initialize-transaction-coordinator-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarTransactionCoordinatorMetadataSetup $@
elif [ $COMMAND == "zookeeper-shell" ]; then
exec $JAVA $OPTS org.apache.zookeeper.ZooKeeperMain $@
elif [ $COMMAND == "compact-topic" ]; then
Expand Down
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -799,3 +799,8 @@ globalZookeeperServers=

# Deprecated - Enable TLS when talking with other clusters to replicate messages
replicationTlsEnabled=false

### --- Transaction config variables --- ###

# Enable transaction coordinator in broker
transactionCoordinatorEnabled=true
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
private static final String CATEGORY_SASL_AUTH = "SASL Authentication Provider";
@Category
private static final String CATEGORY_HTTP = "HTTP";
@Category
private static final String CATEGORY_TRANSACTION = "Transaction";

/***** --- pulsar configuration --- ****/
@FieldContext(
Expand Down Expand Up @@ -1320,6 +1322,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int managedLedgerOffloadMaxThreads = 2;

/**** --- Transaction config variables --- ****/
@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "Enable transaction coordinator in broker"
)
private boolean transactionCoordinatorEnabled = true;

/**
* @deprecated See {@link #getConfigurationStoreServers}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
Expand All @@ -45,6 +48,7 @@
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs;
Expand Down Expand Up @@ -100,6 +104,11 @@ private static class Arguments {
}, description = "Num storage containers of BookKeeper stream storage")
private int numStreamStorageContainers = 16;

@Parameter(names = {
"--initial-num-transaction-coordinators"
}, description = "Num transaction coordinators will assigned in cluster")
private int numTransactionCoordinators = 16;

@Parameter(names = { "-h", "--help" }, description = "Show this help message")
private boolean help = false;
}
Expand Down Expand Up @@ -135,6 +144,11 @@ public static void main(String[] args) throws Exception {
arguments.configurationStore = arguments.globalZookeeper;
}

if (arguments.numTransactionCoordinators <= 0) {
System.err.println("Number of transaction coordinators must greater than 0");
System.exit(1);
}

log.info("Setting up cluster {} with zk={} configuration-store={}", arguments.cluster, arguments.zookeeper,
arguments.configurationStore);

Expand Down Expand Up @@ -207,67 +221,118 @@ public static void main(String[] args) throws Exception {
}

// Create public tenant, whitelisted to use the this same cluster, along with other clusters
String publicTenantPath = POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT;
createTenantIfAbsent(configStoreZk, TopicName.PUBLIC_TENANT, arguments.cluster);

// Create system tenant
createTenantIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);

// Create default namespace
createNamespaceIfAbsent(configStoreZk, NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE),
arguments.cluster);

Stat stat = configStoreZk.exists(publicTenantPath, false);
// Create system namespace
createNamespaceIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE, arguments.cluster);

// Create transaction coordinator assign partitioned topic
createPartitionedTopic(configStoreZk, TopicName.TRANSACTION_COORDINATOR_ASSIGN, arguments.numTransactionCoordinators);

log.info("Cluster metadata for '{}' setup correctly", arguments.cluster);
}

static void createTenantIfAbsent(ZooKeeper configStoreZk, String tenant, String cluster) throws IOException,
KeeperException, InterruptedException {

String tenantPath = POLICIES_ROOT + "/" + tenant;

Stat stat = configStoreZk.exists(tenantPath, false);
if (stat == null) {
TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(arguments.cluster));
TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(cluster));

try {
ZkUtils.createFullPathOptimistic(configStoreZk, publicTenantPath,
ZkUtils.createFullPathOptimistic(configStoreZk, tenantPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
} else {
// Update existing public tenant with new cluster
byte[] content = configStoreZk.getData(publicTenantPath, false, null);
byte[] content = configStoreZk.getData(tenantPath, false, null);
TenantInfo publicTenant = ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);

// Only update z-node if the list of clusters should be modified
if (!publicTenant.getAllowedClusters().contains(arguments.cluster)) {
publicTenant.getAllowedClusters().add(arguments.cluster);
if (!publicTenant.getAllowedClusters().contains(cluster)) {
publicTenant.getAllowedClusters().add(cluster);

configStoreZk.setData(publicTenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
configStoreZk.setData(tenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
stat.getVersion());
}
}
}

// Create default namespace
String defaultNamespacePath = POLICIES_ROOT + "/" + TopicName.PUBLIC_TENANT + "/" + TopicName.DEFAULT_NAMESPACE;
static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName namespaceName, String cluster)
throws KeeperException, InterruptedException, IOException {
String namespacePath = POLICIES_ROOT + "/" +namespaceName.toString();
Policies policies;

stat = configStoreZk.exists(defaultNamespacePath, false);
Stat stat = configStoreZk.exists(namespacePath, false);
if (stat == null) {
policies = new Policies();
policies.bundles = getBundles(16);
policies.replication_clusters = Collections.singleton(arguments.cluster);
policies.replication_clusters = Collections.singleton(cluster);

try {
ZkUtils.createFullPathOptimistic(
configStoreZk,
defaultNamespacePath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
configStoreZk,
namespacePath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} catch (NodeExistsException e) {
// Ignore
}
} else {
byte[] content = configStoreZk.getData(defaultNamespacePath, false, null);
byte[] content = configStoreZk.getData(namespacePath, false, null);
policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);

// Only update z-node if the list of clusters should be modified
if (!policies.replication_clusters.contains(arguments.cluster)) {
policies.replication_clusters.add(arguments.cluster);
if (!policies.replication_clusters.contains(cluster)) {
policies.replication_clusters.add(cluster);

configStoreZk.setData(defaultNamespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
configStoreZk.setData(namespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
stat.getVersion());
}
}
}

log.info("Cluster metadata for '{}' setup correctly", arguments.cluster);
static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName, int numPartitions) throws KeeperException, InterruptedException, IOException {
String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
Stat stat = configStoreZk.exists(partitionedTopicPath, false);
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions);
if (stat == null) {
try {
ZkUtils.createFullPathOptimistic(
configStoreZk,
partitionedTopicPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT
);
} catch (NodeExistsException e) {
// Ignore
}
} else {
byte[] content = configStoreZk.getData(partitionedTopicPath, false, null);
PartitionedTopicMetadata existsMeta = ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);

// Only update z-node if the partitions should be modified
if (existsMeta.partitions < numPartitions) {
configStoreZk.setData(
partitionedTopicPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
stat.getVersion()
);
}
}
}

public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.zookeeper.ZooKeeper;

/**
* Setup the transaction coordinator metadata for a cluster, the setup will create pulsar/system namespace and create
* partitioned topic for transaction coordinator assign
*/
public class PulsarTransactionCoordinatorMetadataSetup {

private static class Arguments {

@Parameter(names = { "-c", "--cluster" }, description = "Cluster name", required = true)
private String cluster;

@Parameter(names = { "-cs",
"--configuration-store" }, description = "Configuration Store connection string", required = true)
private String configurationStore;

@Parameter(names = {
"--zookeeper-session-timeout-ms"
}, description = "Local zookeeper session timeout ms")
private int zkSessionTimeoutMillis = 30000;

@Parameter(names = {
"--initial-num-transaction-coordinators"
}, description = "Num transaction coordinators will assigned in cluster")
private int numTransactionCoordinators = 16;

@Parameter(names = { "-h", "--help" }, description = "Show this help message")
private boolean help = false;

}

public static void main(String[] args) throws Exception {
Arguments arguments = new Arguments();
JCommander jcommander = new JCommander();
try {
jcommander.addObject(arguments);
jcommander.parse(args);
if (arguments.help) {
jcommander.usage();
return;
}
} catch (Exception e) {
jcommander.usage();
throw e;
}

if (arguments.configurationStore == null) {
System.err.println("Configuration store address argument is required (--configuration-store)");
jcommander.usage();
System.exit(1);
}

if (arguments.numTransactionCoordinators <= 0) {
System.err.println("Number of transaction coordinators must greater than 0");
System.exit(1);
}

ZooKeeper configStoreZk = PulsarClusterMetadataSetup
.initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);

// Create system tenant
PulsarClusterMetadataSetup
.createTenantIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);

// Create system namespace
PulsarClusterMetadataSetup.createNamespaceIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE,
arguments.cluster);

// Create transaction coordinator assign partitioned topic
PulsarClusterMetadataSetup.createPartitionedTopic(configStoreZk, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
arguments.numTransactionCoordinators);

System.out.println("Transaction coordinator metadata setup success");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,11 @@ public Boolean get() {
// Register heartbeat and bootstrap namespaces.
this.nsService.registerBootstrapNamespaces();

// Register pulsar system namespaces
if (config.isTransactionCoordinatorEnabled()) {
this.nsService.registerNamespace(NamespaceName.SYSTEM_NAMESPACE.toString(), false);
}

this.metricsGenerator = new MetricsGenerator(this);

// By starting the Load manager service, the broker will also become visible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public void registerBootstrapNamespaces() throws PulsarServerException {
* @throws PulsarServerException
* @throws Exception
*/
private boolean registerNamespace(String namespace, boolean ensureOwned) throws PulsarServerException {
public boolean registerNamespace(String namespace, boolean ensureOwned) throws PulsarServerException {

String myUrl = pulsar.getSafeBrokerServiceUrl();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public void testOwnedNamespaces() {

Map<String, NamespaceOwnershipStatus> nsMap = pulsarAdmins[i].brokers().getOwnedNamespaces("my-cluster",
list.get(0));
Assert.assertEquals(2, nsMap.size());
Assert.assertEquals(nsMap.size(), 3);
}
} catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public void brokers() throws Exception {

Map<String, NamespaceOwnershipStatus> nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0));
// since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace)
Assert.assertEquals(1, nsMap.size());
Assert.assertEquals(nsMap.size(), 2);
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
Expand All @@ -415,7 +415,7 @@ public void brokers() throws Exception {
Assert.assertEquals(parts.length, 2);
Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("test",
String.format("%s:%d", parts[0], BROKER_WEBSERVICE_PORT_TLS));
Assert.assertEquals(nsMap2.size(), 1);
Assert.assertEquals(nsMap2.size(), 2);

admin.namespaces().deleteNamespace("prop-xyz/ns1");
admin.clusters().deleteCluster("test");
Expand Down
Loading

0 comments on commit f4d954d

Please sign in to comment.