Skip to content

Commit

Permalink
Setup transaction metadata with metdata store (apache#10677)
Browse files Browse the repository at this point in the history
  • Loading branch information
fantapsody authored May 22, 2021
1 parent 9e76161 commit 6347b52
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@
import com.beust.jcommander.Parameter;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
Expand All @@ -50,16 +48,6 @@
import org.apache.pulsar.metadata.api.MetadataStoreLifecycle;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.zookeeper.ZkBookieRackAffinityMapping;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -136,20 +124,6 @@ private static class Arguments {
private boolean help = false;
}

/**
* a wrapper for ZkUtils.createFullPathOptimistic but ignore exception of node exists.
*/
private static void createZkNode(ZooKeeper zkc, String path,
byte[] data, final List<ACL> acl, final CreateMode createMode)
throws KeeperException, InterruptedException {

try {
ZkUtils.createFullPathOptimistic(zkc, path, data, acl, createMode);
} catch (NodeExistsException e) {
// Ignore
}
}

/**
* a wrapper for creating a persistent node with store.put but ignore exception of node exists.
*/
Expand Down Expand Up @@ -297,33 +271,6 @@ public static void main(String[] args) throws Exception {
log.info("Cluster metadata for '{}' setup correctly", arguments.cluster);
}

static void createTenantIfAbsent(ZooKeeper configStoreZk, String tenant, String cluster) throws IOException,
KeeperException, InterruptedException {

String tenantPath = POLICIES_ROOT + "/" + tenant;

Stat stat = configStoreZk.exists(tenantPath, false);
if (stat == null) {
TenantInfo publicTenant = new TenantInfo(Collections.emptySet(), Collections.singleton(cluster));

createZkNode(configStoreZk, tenantPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} else {
// Update existing public tenant with new cluster
byte[] content = configStoreZk.getData(tenantPath, false, null);
TenantInfo publicTenant = ObjectMapperFactory.getThreadLocal().readValue(content, TenantInfo.class);

// Only update z-node if the list of clusters should be modified
if (!publicTenant.getAllowedClusters().contains(cluster)) {
publicTenant.getAllowedClusters().add(cluster);

configStoreZk.setData(tenantPath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicTenant),
stat.getVersion());
}
}
}

static void createTenantIfAbsent(MetadataStore configStore, String tenant, String cluster) throws IOException,
InterruptedException, ExecutionException {

Expand All @@ -350,36 +297,6 @@ static void createTenantIfAbsent(MetadataStore configStore, String tenant, Strin
}
}

static void createNamespaceIfAbsent(ZooKeeper configStoreZk, NamespaceName namespaceName, String cluster)
throws KeeperException, InterruptedException, IOException {
String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
Policies policies;
Stat stat = configStoreZk.exists(namespacePath, false);
if (stat == null) {
policies = new Policies();
policies.bundles = getBundles(16);
policies.replication_clusters = Collections.singleton(cluster);

createZkNode(
configStoreZk,
namespacePath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
byte[] content = configStoreZk.getData(namespacePath, false, null);
policies = ObjectMapperFactory.getThreadLocal().readValue(content, Policies.class);

// Only update z-node if the list of clusters should be modified
if (!policies.replication_clusters.contains(cluster)) {
policies.replication_clusters.add(cluster);

configStoreZk.setData(namespacePath, ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
stat.getVersion());
}
}
}

static void createNamespaceIfAbsent(MetadataStore configStore, NamespaceName namespaceName, String cluster)
throws InterruptedException, IOException, ExecutionException {
String namespacePath = POLICIES_ROOT + "/" + namespaceName.toString();
Expand All @@ -406,35 +323,6 @@ static void createNamespaceIfAbsent(MetadataStore configStore, NamespaceName nam
}
}

static void createPartitionedTopic(ZooKeeper configStoreZk, TopicName topicName, int numPartitions)
throws KeeperException, InterruptedException, IOException {
String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
Stat stat = configStoreZk.exists(partitionedTopicPath, false);
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(numPartitions);
if (stat == null) {
createZkNode(
configStoreZk,
partitionedTopicPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT
);
} else {
byte[] content = configStoreZk.getData(partitionedTopicPath, false, null);
PartitionedTopicMetadata existsMeta =
ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);

// Only update z-node if the partitions should be modified
if (existsMeta.partitions < numPartitions) {
configStoreZk.setData(
partitionedTopicPath,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(metadata),
stat.getVersion()
);
}
}
}

static void createPartitionedTopic(MetadataStore configStore, TopicName topicName, int numPartitions)
throws InterruptedException, IOException, ExecutionException {
String partitionedTopicPath = ZkAdminPaths.partitionedTopicPath(topicName);
Expand All @@ -458,25 +346,6 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam
}
}

public static ZooKeeper initZk(String connection, int sessionTimeout) throws Exception {
ZooKeeperClientFactory zkfactory = new ZookeeperClientFactoryImpl();
int chrootIndex = connection.indexOf("/");
if (chrootIndex > 0) {
String chrootPath = connection.substring(chrootIndex);
String zkConnectForChrootCreation = connection.substring(0, chrootIndex);
ZooKeeper chrootZk = zkfactory.create(
zkConnectForChrootCreation, SessionType.ReadWrite, sessionTimeout).get();
if (chrootZk.exists(chrootPath, false) == null) {
createZkNode(chrootZk, chrootPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
log.info("Created zookeeper chroot path {} successfully", chrootPath);
}
chrootZk.close();
}
ZooKeeper zkConnect = zkfactory.create(connection, SessionType.ReadWrite, sessionTimeout).get();
return zkConnect;
}

public static MetadataStoreExtended initMetadataStore(String connection, int sessionTimeout) throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.beust.jcommander.Parameter;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

/**
* Setup the transaction coordinator metadata for a cluster, the setup will create pulsar/system namespace and create
Expand Down Expand Up @@ -80,20 +80,20 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}

ZooKeeper configStoreZk = PulsarClusterMetadataSetup
.initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup
.initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
// Create system tenant
PulsarClusterMetadataSetup
.createTenantIfAbsent(configStore, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);

// Create system tenant
PulsarClusterMetadataSetup
.createTenantIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE.getTenant(), arguments.cluster);
// Create system namespace
PulsarClusterMetadataSetup.createNamespaceIfAbsent(configStore, NamespaceName.SYSTEM_NAMESPACE,
arguments.cluster);

// Create system namespace
PulsarClusterMetadataSetup.createNamespaceIfAbsent(configStoreZk, NamespaceName.SYSTEM_NAMESPACE,
arguments.cluster);

// Create transaction coordinator assign partitioned topic
PulsarClusterMetadataSetup.createPartitionedTopic(configStoreZk, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
arguments.numTransactionCoordinators);
// Create transaction coordinator assign partitioned topic
PulsarClusterMetadataSetup.createPartitionedTopic(configStore, TopicName.TRANSACTION_COORDINATOR_ASSIGN,
arguments.numTransactionCoordinators);
}

System.out.println("Transaction coordinator metadata setup success");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@
import java.net.InetSocketAddress;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

@Slf4j
@Test(groups = "broker")
Expand Down Expand Up @@ -120,40 +121,44 @@ public void testSetupWithBkMetadataServiceUri() throws Exception {

PulsarClusterMetadataSetup.main(args);

ZooKeeper localZk = PulsarClusterMetadataSetup.initZk(zkConnection, 30000);
// expected not exist
assertNull(localZk.exists("/ledgers", false));
try (MetadataStoreExtended localStore = PulsarClusterMetadataSetup
.initMetadataStore(zkConnection, 30000)) {
// expected not exist
assertFalse(localStore.exists("/ledgers").get());

String[] bookkeeperMetadataServiceUriArgs = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", zkConnection,
"--configuration-store", zkConnection,
"--bookkeeper-metadata-service-uri", "zk+null://" + zkConnection + "/chroot/ledgers",
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
};
String[] bookkeeperMetadataServiceUriArgs = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", zkConnection,
"--configuration-store", zkConnection,
"--bookkeeper-metadata-service-uri", "zk+null://" + zkConnection + "/chroot/ledgers",
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651"
};

PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs);
ZooKeeper bookkeeperMetadataServiceUriZk = PulsarClusterMetadataSetup.initZk(zkConnection, 30000);
// expected not exist
assertNull(bookkeeperMetadataServiceUriZk.exists("/ledgers", false));
PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs);
try (MetadataStoreExtended bookkeeperMetadataServiceUriStore = PulsarClusterMetadataSetup
.initMetadataStore(zkConnection, 30000)) {
// expected not exist
assertFalse(bookkeeperMetadataServiceUriStore.exists("/ledgers").get());
}

String[] args1 = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", zkConnection,
"--configuration-store", zkConnection,
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
};
String[] args1 = {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", zkConnection,
"--configuration-store", zkConnection,
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651"
};

PulsarClusterMetadataSetup.main(args1);
PulsarClusterMetadataSetup.main(args1);

// expected exist
assertNotNull(localZk.exists("/ledgers", false));
// expected exist
assertTrue(localStore.exists("/ledgers").get());
}
}

@Test
Expand Down

0 comments on commit 6347b52

Please sign in to comment.