Skip to content

Commit

Permalink
Delete associated ledgers before deleting cluster metadata (apache#8244)
Browse files Browse the repository at this point in the history
### Motivation

apache#8169 introduced a command tool to delete a cluster's metadata from ZK. This PR intends to delete the cluster's ledgers from BK.

### Modifications

- Retrieve ledger ids from related ZK nodes
- Add an optional argument to specify BK metadata service URI, then delete these ledgers if it's specified
  • Loading branch information
BewareMyPower authored Oct 27, 2020
1 parent cd8e22e commit 9922249
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.KeeperException;
Expand All @@ -28,6 +37,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

/**
Expand All @@ -51,11 +62,17 @@ private static class Arguments {
@Parameter(names = { "-cs", "--configuration-store" }, description = "Configuration Store connection string")
private String configurationStore;

@Parameter(names = { "--bookkeeper-metadata-service-uri" }, description = "Metadata service uri of BookKeeper")
private String bkMetadataServiceUri;

@Parameter(names = { "-h", "--help" }, description = "Show this help message")
private boolean help = false;
}

public static void main(String[] args) throws InterruptedException {
public static String[] localZkNodes = {
"bookies", "counters", "loadbalance", "managed-ledgers", "namespace", "schemas", "stream" };

public static void main(String[] args) throws Exception {
Arguments arguments = new Arguments();
JCommander jcommander = new JCommander();
try {
Expand All @@ -70,22 +87,32 @@ public static void main(String[] args) throws InterruptedException {
throw e;
}

if (arguments.bkMetadataServiceUri != null) {
BookKeeper bookKeeper = new BookKeeper(new ClientConfiguration().setMetadataServiceUri(arguments.bkMetadataServiceUri));
ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);
ManagedLedgerFactory managedLedgerFactory = new ManagedLedgerFactoryImpl(bookKeeper, localZk);

deleteManagedLedgers(localZk, managedLedgerFactory);
deleteSchemaLedgers(localZk, bookKeeper);

managedLedgerFactory.shutdown(); // `localZk` would be closed here
bookKeeper.close();
}

ZooKeeper localZk = initZk(arguments.zookeeper, arguments.zkSessionTimeoutMillis);

deleteZkNodeRecursively(localZk, "/bookies");
deleteZkNodeRecursively(localZk, "/counters");
deleteZkNodeRecursively(localZk, "/loadbalance");
deleteZkNodeRecursively(localZk, "/managed-ledgers");
deleteZkNodeRecursively(localZk, "/namespace");
deleteZkNodeRecursively(localZk, "/schemas");
deleteZkNodeRecursively(localZk, "/stream");
for (String localZkNode : localZkNodes) {
deleteZkNodeRecursively(localZk, "/" + localZkNode);
}

if (arguments.configurationStore != null && arguments.cluster != null) {
// Should it be done by REST API before broker is down?
ZooKeeper configStoreZk = initZk(arguments.configurationStore, arguments.zkSessionTimeoutMillis);
deleteZkNodeRecursively(configStoreZk, "/admin/clusters/" + arguments.cluster);
configStoreZk.close();
}

localZk.close();
log.info("Cluster metadata for '{}' teardown.", arguments.cluster);
}

Expand All @@ -107,5 +134,77 @@ public static void deleteZkNodeRecursively(ZooKeeper zooKeeper, String path) thr
}
}

private static List<String> getChildren(ZooKeeper zooKeeper, String path) {
try {
return zooKeeper.getChildren(path, null);
} catch (InterruptedException | KeeperException e) {
if (e instanceof KeeperException.NoNodeException) {
return new ArrayList<>();
}
log.error("Failed to get children of {}: {}", path, e);
throw new RuntimeException(e);
}
}

private static byte[] getData(ZooKeeper zooKeeper, String path) {
try {
return zooKeeper.getData(path, null, null);
} catch (KeeperException | InterruptedException e) {
log.error("Failed to get data from {}: {}", path, e);
throw new RuntimeException(e);
}
}

private static void deleteLedger(BookKeeper bookKeeper, long ledgerId) {
try {
bookKeeper.deleteLedger(ledgerId);
if (log.isDebugEnabled()) {
log.debug("Delete ledger id: {}", ledgerId);
}
} catch (InterruptedException | BKException e) {
log.error("Failed to delete ledger {}: {}", ledgerId, e);
throw new RuntimeException(e);
}
}

private static void deleteManagedLedgers(ZooKeeper zooKeeper, ManagedLedgerFactory managedLedgerFactory) {
final String managedLedgersRoot = "/managed-ledgers";
getChildren(zooKeeper, managedLedgersRoot).forEach(tenant -> {
final String tenantRoot = managedLedgersRoot + "/" + tenant;
getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
final String namespaceRoot = String.join("/", tenantRoot, namespace, "persistent");
getChildren(zooKeeper, namespaceRoot).forEach(topic -> {
final TopicName topicName = TopicName.get(String.join("/", tenant, namespace, topic));
try {
managedLedgerFactory.delete(topicName.getPersistenceNamingEncoding());
} catch (InterruptedException | ManagedLedgerException e) {
log.error("Failed to delete ledgers of {}: {}", topicName, e);
throw new RuntimeException(e);
}
});
});
});
}

private static void deleteSchemaLedgers(ZooKeeper zooKeeper, BookKeeper bookKeeper) {
final String schemaLedgersRoot = "/schemas";
getChildren(zooKeeper, schemaLedgersRoot).forEach(tenant -> {
final String tenantRoot = schemaLedgersRoot + "/" + tenant;
getChildren(zooKeeper, tenantRoot).forEach(namespace -> {
final String namespaceRoot = tenantRoot + "/" + namespace;
getChildren(zooKeeper, namespaceRoot).forEach(topic -> {
final String topicRoot = namespaceRoot + "/" + topic;
try {
SchemaLocator.parseFrom(getData(zooKeeper, topicRoot)).getIndexList().stream()
.map(indexEntry -> indexEntry.getPosition().getLedgerId())
.forEach(ledgerId -> deleteLedger(bookKeeper, ledgerId));
} catch (InvalidProtocolBufferException e) {
log.warn("Invalid data format from {}: {}", topicRoot, e);
}
});
});
});
}

private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataTeardown.class);
}
12 changes: 12 additions & 0 deletions tests/integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.cli;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.PulsarClusterMetadataTeardown;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.tests.integration.containers.ChaosContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.apache.zookeeper.ZooKeeper;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

@Slf4j
public class ClusterMetadataTearDownTest {

private final PulsarClusterSpec spec = PulsarClusterSpec.builder()
.clusterName("ClusterMetadataTearDownTest-" + UUID.randomUUID().toString().substring(0, 8))
.numProxies(0)
.numFunctionWorkers(0)
.enablePrestoWorker(false)
.build();

private final PulsarCluster pulsarCluster = PulsarCluster.forSpec(spec);

private ZooKeeper localZk;
private ZooKeeper configStoreZk;

private String metadataServiceUri;
private MetadataBookieDriver driver;
private LedgerManager ledgerManager;

private PulsarClient client;
private PulsarAdmin admin;

@BeforeSuite
public void setupCluster() throws Exception {
pulsarCluster.start();
metadataServiceUri = "zk+null://" + pulsarCluster.getZKConnString() + "/ledgers";

final int sessionTimeoutMs = 30000;
localZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getZKConnString(), sessionTimeoutMs);
configStoreZk = PulsarClusterMetadataTeardown.initZk(pulsarCluster.getCSConnString(), sessionTimeoutMs);

driver = MetadataDrivers.getBookieDriver(URI.create(metadataServiceUri));
driver.initialize(new ServerConfiguration().setMetadataServiceUri(metadataServiceUri), () -> {}, NullStatsLogger.INSTANCE);
ledgerManager = driver.getLedgerManagerFactory().newLedgerManager();

client = PulsarClient.builder().serviceUrl(pulsarCluster.getPlainTextServiceUrl()).build();
admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
}

@AfterSuite
public void tearDownCluster() {
try {
ledgerManager.close();
} catch (IOException e) {
log.warn("Failed to close ledger manager: ", e);
}
driver.close();
try {
configStoreZk.close();
} catch (InterruptedException ignored) {
}
try {
localZk.close();
} catch (InterruptedException ignored) {
}
pulsarCluster.stop();
}

@Test
public void testDeleteCluster() throws Exception {
assertEquals(getNumOfLedgers(), 0);
final String tenant = "my-tenant";
final String namespace = tenant + "/my-ns";

admin.tenants().createTenant(tenant,
new TenantInfo(new HashSet<>(), Collections.singleton(pulsarCluster.getClusterName())));
admin.namespaces().createNamespace(namespace);

String[] topics = { "topic-1", "topic-2", namespace + "/topic-1" };
for (String topic : topics) {
try (Producer<String> producer = client.newProducer(Schema.STRING).topic(topic).create()) {
producer.send("msg");
}
String[] subscriptions = { "sub-1", "sub-2" };
for (String subscription : subscriptions) {
try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subscription)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledge(msg);
}
}
}

final String partitionedTopic = namespace + "/par-topic";
admin.topics().createPartitionedTopic(partitionedTopic, 3);

// TODO: the schema ledgers of a partitioned topic cannot be deleted completely now,
// so we create producers/consumers without schema here
try (Producer<byte[]> producer = client.newProducer().topic(partitionedTopic).create()) {
producer.send("msg".getBytes());
try (Consumer<byte[]> consumer = client.newConsumer()
.topic(partitionedTopic)
.subscriptionName("my-sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledge(msg);
}
}

pulsarCluster.getBrokers().forEach(ChaosContainer::stop);

assertTrue(getNumOfLedgers() > 0);
log.info("Before delete, cluster name: {}, num of ledgers: {}", pulsarCluster.getClusterName(), getNumOfLedgers());

String[] args = { "-zk", pulsarCluster.getZKConnString(),
"-cs", pulsarCluster.getCSConnString(),
"-c", pulsarCluster.getClusterName(),
"--bookkeeper-metadata-service-uri", metadataServiceUri };
PulsarClusterMetadataTeardown.main(args);


// 1. Check Bookie for number of ledgers
assertEquals(getNumOfLedgers(), 0);

// 2. Check ZooKeeper for relative nodes
final int zkOpTimeoutMs = 10000;
List<String> localZkNodes = ZkUtils.getChildrenInSingleNode(localZk, "/", zkOpTimeoutMs);
for (String node : PulsarClusterMetadataTeardown.localZkNodes) {
assertFalse(localZkNodes.contains(node));
}
List<String> clusterNodes = ZkUtils.getChildrenInSingleNode(configStoreZk, "/admin/clusters", zkOpTimeoutMs);
assertFalse(clusterNodes.contains(pulsarCluster.getClusterName()));
}

private long getNumOfLedgers() {
final AtomicInteger returnCode = new AtomicInteger(BKException.Code.OK);
final CountDownLatch processDone = new CountDownLatch(1);
final AtomicLong numOfLedgers = new AtomicLong(0L);

ledgerManager.asyncProcessLedgers((ledgerId, cb) -> numOfLedgers.incrementAndGet(), (rc, path, ctx) -> {
returnCode.set(rc);
processDone.countDown();
}, null, BKException.Code.OK, BKException.Code.ReadException);

try {
processDone.await(5, TimeUnit.SECONDS); // a timeout which is long enough
} catch (InterruptedException e) {
fail("asyncProcessLedgers failed", e);
}
return numOfLedgers.get();
}

}
Loading

0 comments on commit 9922249

Please sign in to comment.