Skip to content

Commit

Permalink
[pulsar-broker] close managed-ledgers before giving up bundle ownersh…
Browse files Browse the repository at this point in the history
…ip to avoid bad zk-version (apache#5599)

### Motivation

We have seen multiple below occurrence where unloading topic doesn't complete and gets stuck. and broker gives up ownership after a timeout and closing ml-factory closes unclosed managed-ledger which corrupts metadata zk-version and topic owned by new broker keeps failing with exception: `ManagedLedgerException$BadVersionException`

right now, while unloading bundle: broker removes ownership of bundle after timeout even if topic's managed-ledger is not closed successfully and `ManagedLedgerFactoryImpl` closes unclosed ml-ledger on broker shutdown which causes bad zk-version in to the new broker and because of that cursors are not able to update cursor-metadata into zk.

```
01:01:13.452 [shutdown-thread-57-1] INFO  org.apache.pulsar.broker.namespace.OwnedBundle - Disabling ownership: my-property/my-cluster/my-ns/0xd0000000_0xe0000000
:
01:01:13.653 [shutdown-thread-57-1] INFO  org.apache.pulsar.broker.service.BrokerService - [persistent://my-property/my-cluster/my-ns/topic-partition-53] Unloading topic
:
01:02:13.677 [shutdown-thread-57-1] INFO  org.apache.pulsar.broker.namespace.OwnedBundle - Unloading my-property/my-cluster/my-ns/0xd0000000_0xe0000000 namespace-bundle with 0 topics completed in 60225.0 ms
:
01:02:13.675 [shutdown-thread-57-1] ERROR org.apache.pulsar.broker.namespace.OwnedBundle - Failed to close topics in namespace my-property/my-cluster/my-ns/0xd0000000_0xe0000000 in 1/MINUTES timeout
01:02:13.677 [pulsar-ordered-OrderedExecutor-7-0-EventThread] INFO  org.apache.pulsar.broker.namespace.OwnershipCache - [/namespace/my-property/my-cluster/my-ns/0xd0000000_0xe0000000] Removed zk lock for service unit: OK
:
01:02:14.404 [shutdown-thread-57-1] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [my-property/my-cluster/my-ns/persistent/topic-partition-53] Closing managed ledger
```

### Modification

This fix will make sure that broker closes managed-ledger before giving up bundle ownership to avoid below exception at new broker where bundle moves
```

01:02:30.995 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [my-property/my-cluster/my-ns/persistent/topic-partition-53][my-sub] Metadata ledger creation failed
org.apache.bookkeeper.mledger.ManagedLedgerException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
Caused by: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion
        at org.apache.zookeeper.KeeperException.create(KeeperException.java:118) ~[zookeeper-3.4.13.jar:3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03]
        at org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper.lambda$null$125(MetaStoreImplZookeeper.java:288) ~[managed-ledger-original-2.4.5-yahoo.jar:2.4.5-yahoo]
        at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger-original-2.4.5-yahoo.jar:2.4.5-yahoo]
        at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.9.0.jar:4.9.0]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-all-4.1.32.Final.jar:4.1.32.Final]
        at java.lang.Thread.run(Thread.java:834) [?:?]
```
  • Loading branch information
rdhabalia authored and jiazhai committed Jan 6, 2020
1 parent 77a997f commit 0a259ab
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1880,7 +1880,7 @@ protected void unloadTopic(TopicName topicName, boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
try {
Topic topic = getTopicReference(topicName);
topic.close().get();
topic.close(false).get();
log.info("[{}] Successfully unloaded topic {}", clientAppId(), topicName);
} catch (NullPointerException e) {
log.error("[{}] topic {} not found", clientAppId(), topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,17 @@ public void handleUnloadRequest(PulsarService pulsar, long timeout, TimeUnit tim

// close topics forcefully
try {
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle).get(timeout, timeoutUnit);
unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(bundle, false).get(timeout, timeoutUnit);
} catch (TimeoutException e) {
// ignore topic-close failure to unload bundle
LOG.error("Failed to close topics in namespace {} in {}/{} timeout", bundle.toString(), timeout,
timeoutUnit);
try {
LOG.info("Forcefully close topics for bundle {}", bundle);
pulsar.getBrokerService().unloadServiceUnit(bundle, true).get(timeout, timeoutUnit);
} catch (Exception e1) {
LOG.error("Failed to close topics forcefully under bundle {}", bundle, e1);
}
} catch (Exception e) {
// ignore topic-close failure to unload bundle
LOG.error("Failed to close topics under namespace {}", bundle.toString(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1216,9 +1216,10 @@ public void checkTopicNsOwnership(final String topic) throws RuntimeException {
* Unload all the topic served by the broker service under the given service unit
*
* @param serviceUnit
* @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect and forcefully close managed-ledger
* @return
*/
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit) {
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Integer> result = new CompletableFuture<Integer>();
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
topics.forEach((name, topicFuture) -> {
Expand All @@ -1227,7 +1228,7 @@ public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit)
// Topic needs to be unloaded
log.info("[{}] Unloading topic", topicName);
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close() : CompletableFuture.completedFuture(null)));
.thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect) : CompletableFuture.completedFuture(null)));
}
});
CompletableFuture<Void> aggregator = FutureUtil.waitForAll(closeFutures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> checkReplication();

CompletableFuture<Void> close();
CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);

void checkGC(int gcInterval);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
schemaValidationEnforced = policies.schema_validation_enforced;

} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic,
e.getMessage());
isEncryptionRequired = false;
}
}
Expand Down Expand Up @@ -285,8 +286,8 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
name -> new NonPersistentSubscription(this, subscriptionName));

try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
Expand Down Expand Up @@ -316,7 +317,8 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
}

@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
}

Expand All @@ -335,9 +337,8 @@ public CompletableFuture<Void> deleteForcefully() {
return delete(false, true, false);
}

private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean closeIfClientsConnected,
boolean deleteSchema) {
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected,
boolean deleteSchema) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand Down Expand Up @@ -418,16 +419,18 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,

/**
* Close this topic - close all producers and subscriptions associated with this topic
*
*
* @param closeWithoutWaitingClientDisconnect
* don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close() {
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
try {
if (!isFenced) {
if (!isFenced || closeWithoutWaitingClientDisconnect) {
isFenced = true;
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
Expand All @@ -444,7 +447,10 @@ public CompletableFuture<Void> close() {
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));

FutureUtil.waitForAll(futures).thenRun(() -> {
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);

clientCloseFuture.thenRun(() -> {
log.info("[{}] Topic closed", topic);
// unload topic iterates over topics map and removing from the map with the same thread creates deadlock.
// so, execute it in different thread
Expand Down Expand Up @@ -531,10 +537,11 @@ public CompletableFuture<Void> checkReplication() {
boolean startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster);
return addReplicationCluster(remoteCluster, NonPersistentTopic.this, localCluster);
}

protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
Expand Down Expand Up @@ -618,8 +625,9 @@ public Replicator getPersistentReplicator(String remoteCluster) {
return replicators.get(remoteCluster);
}

public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace,
boolean hydratePublishers) {

TopicStats topicStats = threadLocalTopicStats.get();
topicStats.reset();
Expand Down Expand Up @@ -648,7 +656,6 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
});
topicStatsStream.endList();


// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStats.remotePublishersStats.size();
Expand Down Expand Up @@ -859,7 +866,8 @@ public void checkInactiveSubscriptions() {
@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
data.encryption_required);
}
isEncryptionRequired = data.encryption_required;
setSchemaCompatibilityStrategy(data);
Expand Down Expand Up @@ -912,17 +920,14 @@ public Position getLastMessageId() {

private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);


@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion->
CompletableFuture.completedFuture(null));
}
});
return hasSchema().thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -901,18 +901,25 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
return deleteFuture;
}

public CompletableFuture<Void> close() {
return close(false);
}

/**
* Close this topic - close all producers and subscriptions associated with this topic
*
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close() {
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
try {
if (!isFenced) {
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all resources to be closed.
if (!isFenced || closeWithoutWaitingClientDisconnect) {
isFenced = true;
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
Expand All @@ -928,8 +935,11 @@ public CompletableFuture<Void> close() {
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));

CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);

FutureUtil.waitForAll(futures).thenRun(() -> {
clientCloseFuture.thenRun(() -> {
// After having disconnected all producers/consumers, close the managed ledger
ledger.asyncClose(new CloseCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwa
result.completeExceptionally(new RuntimeException("first time failed"));
return result;
}
}).when(spyTopic).close();
}).when(spyTopic).close(false);
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));
try {
pulsar.getNamespaceService().unloadNamespaceBundle(bundle);
Expand Down Expand Up @@ -316,7 +316,7 @@ public void testUnloadNamespaceBundleWithStuckTopic() throws Exception {
public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
return new CompletableFuture<Void>();
}
}).when(spyTopic).close();
}).when(spyTopic).close(false);
NamespaceBundle bundle = pulsar.getNamespaceService().getBundle(TopicName.get(topicName));

// try to unload bundle whose topic will be stuck
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.pulsar.broker.namespace;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.PulsarService.webAddress;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -88,7 +88,7 @@ public void setup() throws Exception {
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
brokerService = mock(BrokerService.class);
doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any());
doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(any(), anyBoolean());

doReturn(zkCache).when(pulsar).getLocalZkCache();
doReturn(localCache).when(pulsar).getLocalZkCacheService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand All @@ -49,6 +50,7 @@

import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
Expand All @@ -58,6 +60,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
Expand All @@ -66,6 +69,7 @@
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -912,4 +916,49 @@ public void testCreateNamespacePolicy() throws Exception {
assertEquals(policy.get().bundles.numBundles, totalBundle);
}

/**
* It verifies that unloading bundle gracefully closes managed-ledger before removing ownership to avoid bad-zk
* version.
*
* @throws Exception
*/
@Test
public void testStuckTopicUnloading() throws Exception {
final String namespace = "prop/ns-abc";
final String topicName = "persistent://" + namespace + "/unoadTopic";
final String topicMlName = namespace + "/persistent/unoadTopic";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
.subscribe();
consumer.close();

ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic(topicName).sendTimeout(5,
TimeUnit.SECONDS);

Producer<byte[]> producer = producerBuilder.create();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();

ManagedLedgerFactoryImpl mlFactory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerClientFactory()
.getManagedLedgerFactory();
Field ledgersField = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
ledgersField.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = (ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) ledgersField
.get(mlFactory);
assertNotNull(ledgers.get(topicMlName));

org.apache.pulsar.broker.service.Producer prod = spy(topic.producers.values().get(0));
topic.producers.clear();
topic.producers.add(prod);
CompletableFuture<Void> waitFuture = new CompletableFuture<Void>();
doReturn(waitFuture).when(prod).disconnect();
Set<NamespaceBundle> bundles = pulsar.getNamespaceService().getOwnedServiceUnits();
for (NamespaceBundle bundle : bundles) {
String ns = bundle.getNamespaceObject().toString();
System.out.println();
if (namespace.equals(ns)) {
pulsar.getNamespaceService().unloadNamespaceBundle(bundle, 2, TimeUnit.SECONDS);
}
}
assertNull(ledgers.get(topicMlName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ public void testMessageAvailableAfterRestart() throws Exception {
}

// cause broker to drop topic. Will be loaded next time we access it
pulsar.getBrokerService().getTopicReference(topic).get().close().get();
pulsar.getBrokerService().getTopicReference(topic).get().close(false).get();

try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic)
.startMessageId(MessageId.earliest).create()) {
Expand Down
Loading

0 comments on commit 0a259ab

Please sign in to comment.