From 99222492ceb8aac8fcdfa72f8abe3f424c928bdb Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 27 Oct 2020 11:34:31 +0800 Subject: [PATCH] Delete associated ledgers before deleting cluster metadata (#8244) ### Motivation #8169 introduced a command tool to delete a cluster's metadata from ZK. This PR intends to delete the cluster's ledgers from BK. ### Modifications - Retrieve ledger ids from related ZK nodes - Add an optional argument to specify BK metadata service URI, then delete these ledgers if it's specified --- .../pulsar/PulsarClusterMetadataTeardown.java | 115 +++++++++- tests/integration/pom.xml | 12 + .../cli/ClusterMetadataTearDownTest.java | 208 ++++++++++++++++++ .../integration/topologies/PulsarCluster.java | 4 + .../src/test/resources/pulsar-cli.xml | 1 + 5 files changed, 332 insertions(+), 8 deletions(-) create mode 100644 tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java index a9a1c11b7df1c..1f25be38795f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java @@ -20,6 +20,15 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl; import org.apache.zookeeper.KeeperException; @@ -28,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutionException; /** @@ -51,11 +62,17 @@ private static class Arguments { @Parameter(names = { "-cs", "--configuration-store" }, description = "Configuration Store connection string") private String configurationStore; + @Parameter(names = { "--bookkeeper-metadata-service-uri" }, description = "Metadata service uri of BookKeeper") + private String bkMetadataServiceUri; + @Parameter(names = { "-h", "--help" }, description = "Show this help message") private boolean help = false; } - public static void main(String[] args) throws InterruptedException { + public static String[] localZkNodes = { + "bookies", "counters", "loadbalance", "managed-ledgers", "namespace", "schemas", "stream" }; + + public static void main(String[] args) throws Exception { Arguments arguments = new Arguments(); JCommander jcommander = new JCommander(); try { @@ -70,22 +87,32 @@ public static void main(String[] args) throws InterruptedException { throw e; } + if (arguments.bkMetadataServiceUri != null) { + BookKeeper bookKeeper = new BookKeeper(new ClientConfiguration().setMetadataServiceUri(arguments.bkMetadataServiceUri)); + ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis); + ManagedLedgerFactory managedLedgerFactory = new ManagedLedgerFactoryImpl(bookKeeper, localZk); + + deleteManagedLedgers(localZk, managedLedgerFactory); + deleteSchemaLedgers(localZk, bookKeeper); + + managedLedgerFactory.shutdown(); // `localZk` would be closed here + bookKeeper.close(); + } + ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis); - deleteZkNodeRecursively(localZk, "/bookies"); - deleteZkNodeRecursively(localZk, "/counters"); - deleteZkNodeRecursively(localZk, "/loadbalance"); - deleteZkNodeRecursively(localZk, "/managed-ledgers"); - deleteZkNodeRecursively(localZk, "/namespace"); - deleteZkNodeRecursively(localZk, "/schemas"); - deleteZkNodeRecursively(localZk, "/stream"); + for (String localZkNode : localZkNodes) { + deleteZkNodeRecursively(localZk, "/" + localZkNode); + } if (arguments.configurationStore != null && arguments.cluster != null) { // Should it be done by REST API before broker is down? ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis); deleteZkNodeRecursively(configStoreZk, "/admin/clusters/" + arguments.cluster); + configStoreZk.close(); } + localZk.close(); log.info("Cluster metadata for '{}' teardown.", arguments.cluster); } @@ -107,5 +134,77 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr } } + private static List getChildren(ZooKeeper zooKeeper, String path) { + try { + return zooKeeper.getChildren(path, null); + } catch (InterruptedException | KeeperException e) { + if (e instanceof KeeperException.NoNodeException) { + return new ArrayList<>(); + } + log.error("Failed to get children of {}: {}", path, e); + throw new RuntimeException(e); + } + } + + private static byte[] getData(ZooKeeper zooKeeper, String path) { + try { + return zooKeeper.getData(path, null, null); + } catch (KeeperException | InterruptedException e) { + log.error("Failed to get data from {}: {}", path, e); + throw new RuntimeException(e); + } + } + + private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) { + try { + bookKeeper.deleteLedger(ledgerId); + if (log.isDebugEnabled()) { + log.debug("Delete ledger id: {}", ledgerId); + } + } catch (InterruptedException | BKException e) { + log.error("Failed to delete ledger {}: {}", ledgerId, e); + throw new RuntimeException(e); + } + } + + private static void deleteManagedLedgers(ZooKeeper zooKeeper, ManagedLedgerFactory managedLedgerFactory) { + final String managedLedgersRoot = "/managed-ledgers"; + getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> { + final String tenantRoot = managedLedgersRoot + "/" + tenant; + getChildren(zooKeeper, tenantRoot).forEach(namespace -> { + final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent"); + getChildren(zooKeeper, namespaceRoot).forEach(topic -> { + final TopicName topicName = TopicName.get(String.join("/", tenant, namespace, topic)); + try { + managedLedgerFactory.delete(topicName.getPersistenceNamingEncoding()); + } catch (InterruptedException | ManagedLedgerException e) { + log.error("Failed to delete ledgers of {}: {}", topicName, e); + throw new RuntimeException(e); + } + }); + }); + }); + } + + private static void deleteSchemaLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) { + final String schemaLedgersRoot = "/schemas"; + getChildren(zooKeeper, schemaLedgersRoot).forEach(tenant -> { + final String tenantRoot = schemaLedgersRoot + "/" + tenant; + getChildren(zooKeeper, tenantRoot).forEach(namespace -> { + final String namespaceRoot = tenantRoot + "/" + namespace; + getChildren(zooKeeper, namespaceRoot).forEach(topic -> { + final String topicRoot = namespaceRoot + "/" + topic; + try { + SchemaLocator.parseFrom(getData(zooKeeper, topicRoot)).getIndexList().stream() + .map(indexEntry -> indexEntry.getPosition().getLedgerId()) + .forEach(ledgerId -> deleteLedger(bookKeeper, ledgerId)); + } catch (InvalidProtocolBufferException e) { + log.warn("Invalid data format from {}: {}", topicRoot, e); + } + }); + }); + }); + } + private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataTeardown.class); } diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 3219d3a334e4f..55729a5a822fb 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -48,6 +48,18 @@ ${project.version} test + + org.apache.pulsar + pulsar-broker + ${project.version} + test + + + org.apache.pulsar + pulsar-common + ${project.version} + test + org.apache.pulsar pulsar-client diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java new file mode 100644 index 0000000000000..96e908190351f --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/ClusterMetadataTearDownTest.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.cli; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.meta.LedgerManager; +import org.apache.bookkeeper.meta.MetadataBookieDriver; +import org.apache.bookkeeper.meta.MetadataDrivers; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.PulsarClusterMetadataTeardown; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.tests.integration.containers.ChaosContainer; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; +import org.apache.zookeeper.ZooKeeper; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeSuite; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +@Slf4j +public class ClusterMetadataTearDownTest { + + private final PulsarClusterSpec spec = PulsarClusterSpec.builder() + .clusterName("ClusterMetadataTearDownTest-" + UUID.randomUUID().toString().substring(0, 8)) + .numProxies(0) + .numFunctionWorkers(0) + .enablePrestoWorker(false) + .build(); + + private final PulsarCluster pulsarCluster = PulsarCluster.forSpec(spec); + + private ZooKeeper localZk; + private ZooKeeper configStoreZk; + + private String metadataServiceUri; + private MetadataBookieDriver driver; + private LedgerManager ledgerManager; + + private PulsarClient client; + private PulsarAdmin admin; + + @BeforeSuite + public void setupCluster() throws Exception { + pulsarCluster.start(); + metadataServiceUri = "zk+null://" + pulsarCluster.getZKConnString() + "/ledgers"; + + final int sessionTimeoutMs = 30000; + localZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getZKConnString(), sessionTimeoutMs); + configStoreZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getCSConnString(), sessionTimeoutMs); + + driver = MetadataDrivers.getBookieDriver(URI.create(metadataServiceUri)); + driver.initialize(new ServerConfiguration().setMetadataServiceUri(metadataServiceUri), () -> {}, NullStatsLogger.INSTANCE); + ledgerManager = driver.getLedgerManagerFactory().newLedgerManager(); + + client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build(); + admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build(); + } + + @AfterSuite + public void tearDownCluster() { + try { + ledgerManager.close(); + } catch (IOException e) { + log.warn("Failed to close ledger manager: ", e); + } + driver.close(); + try { + configStoreZk.close(); + } catch (InterruptedException ignored) { + } + try { + localZk.close(); + } catch (InterruptedException ignored) { + } + pulsarCluster.stop(); + } + + @Test + public void testDeleteCluster() throws Exception { + assertEquals(getNumOfLedgers(), 0); + final String tenant = "my-tenant"; + final String namespace = tenant + "/my-ns"; + + admin.tenants().createTenant(tenant, + new TenantInfo(new HashSet<>(), Collections.singleton(pulsarCluster.getClusterName()))); + admin.namespaces().createNamespace(namespace); + + String[] topics = { "topic-1", "topic-2", namespace + "/topic-1" }; + for (String topic : topics) { + try (Producer producer = client.newProducer(Schema.STRING).topic(topic).create()) { + producer.send("msg"); + } + String[] subscriptions = { "sub-1", "sub-2" }; + for (String subscription : subscriptions) { + try (Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subscription) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + consumer.acknowledge(msg); + } + } + } + + final String partitionedTopic = namespace + "/par-topic"; + admin.topics().createPartitionedTopic(partitionedTopic, 3); + + // TODO: the schema ledgers of a partitioned topic cannot be deleted completely now, + // so we create producers/consumers without schema here + try (Producer producer = client.newProducer().topic(partitionedTopic).create()) { + producer.send("msg".getBytes()); + try (Consumer consumer = client.newConsumer() + .topic(partitionedTopic) + .subscriptionName("my-sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + Message msg = consumer.receive(5, TimeUnit.SECONDS); + consumer.acknowledge(msg); + } + } + + pulsarCluster.getBrokers().forEach(ChaosContainer::stop); + + assertTrue(getNumOfLedgers() > 0); + log.info("Before delete, cluster name: {}, num of ledgers: {}", pulsarCluster.getClusterName(), getNumOfLedgers()); + + String[] args = { "-zk", pulsarCluster.getZKConnString(), + "-cs", pulsarCluster.getCSConnString(), + "-c", pulsarCluster.getClusterName(), + "--bookkeeper-metadata-service-uri", metadataServiceUri }; + PulsarClusterMetadataTeardown.main(args); + + + // 1. Check Bookie for number of ledgers + assertEquals(getNumOfLedgers(), 0); + + // 2. Check ZooKeeper for relative nodes + final int zkOpTimeoutMs = 10000; + List localZkNodes = ZkUtils.getChildrenInSingleNode(localZk, "/", zkOpTimeoutMs); + for (String node : PulsarClusterMetadataTeardown.localZkNodes) { + assertFalse(localZkNodes.contains(node)); + } + List clusterNodes = ZkUtils.getChildrenInSingleNode(configStoreZk, "/admin/clusters", zkOpTimeoutMs); + assertFalse(clusterNodes.contains(pulsarCluster.getClusterName())); + } + + private long getNumOfLedgers() { + final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK); + final CountDownLatch processDone = new CountDownLatch(1); + final AtomicLong numOfLedgers = new AtomicLong(0L); + + ledgerManager.asyncProcessLedgers((ledgerId, cb) -> numOfLedgers.incrementAndGet(), (rc, path, ctx) -> { + returnCode.set(rc); + processDone.countDown(); + }, null, BKException.Code.OK, BKException.Code.ReadException); + + try { + processDone.await(5, TimeUnit.SECONDS); // a timeout which is long enough + } catch (InterruptedException e) { + fail("asyncProcessLedgers failed", e); + } + return numOfLedgers.get(); + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index db77ae61f84e9..b916eea5ad0bc 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -200,6 +200,10 @@ public String getZKConnString() { return zkContainer.getContainerIpAddress() + ":" + zkContainer.getMappedPort(ZK_PORT); } + public String getCSConnString() { + return csContainer.getContainerIpAddress() + ":" + csContainer.getMappedPort(CS_PORT); + } + public Network getNetwork() { return network; } diff --git a/tests/integration/src/test/resources/pulsar-cli.xml b/tests/integration/src/test/resources/pulsar-cli.xml index 56906a6581009..a5b86059742dd 100644 --- a/tests/integration/src/test/resources/pulsar-cli.xml +++ b/tests/integration/src/test/resources/pulsar-cli.xml @@ -22,6 +22,7 @@ +