Skip to content

Commit

Permalink
[Transaction] Fix delete sub then delete pending ack. (apache#11023)
Browse files Browse the repository at this point in the history
Fix delete sub then delete pending ack managedledger.
  • Loading branch information
congbobo184 authored Aug 9, 2021
1 parent 5ae0554 commit a50fe87
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
Expand Down Expand Up @@ -107,6 +108,7 @@
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -941,7 +943,32 @@ public CompletableFuture<Subscription> createSubscription(String subscriptionNam
@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
getBrokerService().getManagedLedgerFactory().asyncDelete(TopicName.get(MLPendingAckStore
.getTransactionPendingAckStoreSuffix(topic,
Codec.encode(subscriptionName))).getPersistenceNamingEncoding(),
new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
}

@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (exception instanceof MetadataNotFoundException) {
asyncDeleteCursor(subscriptionName, unsubscribeFuture);
return;
}

unsubscribeFuture.completeExceptionally(exception);
log.error("[{}][{}] Error deleting subscription pending ack store",
topic, subscriptionName, exception);
}
}, null);

return unsubscribeFuture;
}

private void asyncDeleteCursor(String subscriptionName, CompletableFuture<Void> unsubscribeFuture) {
ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
Expand All @@ -956,13 +983,12 @@ public void deleteCursorComplete(Object ctx) {
@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Error deleting cursor for subscription", topic, subscriptionName, exception);
log.debug("[{}][{}] Error deleting cursor for subscription",
topic, subscriptionName, exception);
}
unsubscribeFuture.completeExceptionally(new PersistenceException(exception));
}
}, null);

return unsubscribeFuture;
}

void removeSubscription(String subscriptionName) {
Expand Down Expand Up @@ -1069,32 +1095,46 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(topic);

dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);

brokerService.pulsar().getTopicPoliciesService().clean(TopicName.get(topic));
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
List<CompletableFuture<Void>> subsDeleteFutures = new ArrayList<>();
subscriptions.forEach((sub, p) -> subsDeleteFutures.add(unsubscribe(sub)));

FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> {
if (e != null) {
log.error("[{}] Error deleting topic", topic, e);
unfenceTopicToResume();
deleteFuture.completeExceptionally(e);
} else {
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(topic);

dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);

subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);

brokerService.pulsar().getTopicPoliciesService()
.clean(TopicName.get(topic));
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
}

@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (exception.getCause()
instanceof MetadataStoreException.NotFoundException) {
log.info("[{}] Topic is already deleted {}",
topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
unfenceTopicToResume();
log.error("[{}] Error deleting topic", topic, exception);
deleteFuture.completeExceptionally(new PersistenceException(exception));
}
}
}, null);
}

@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (exception.getCause() instanceof MetadataStoreException.NotFoundException) {
log.info("[{}] Topic is already deleted {}", topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
unfenceTopicToResume();
log.error("[{}] Error deleting topic", topic, exception);
deleteFuture.completeExceptionally(new PersistenceException(exception));
}
}
}, null);
});
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ public void setup() throws Exception {
doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
doReturn(mock(PulsarClientImpl.class)).when(pulsar).getClient();

doAnswer(invocation -> {
DeleteLedgerCallback deleteLedgerCallback = invocation.getArgument(1);
deleteLedgerCallback.deleteLedgerComplete(null);
return null;
}).when(mlFactoryMock).asyncDelete(any(), any(), any());

ZooKeeper mockZk = createMockZooKeeper();
doReturn(mockZk).when(pulsar).getZkClient();
doReturn(createMockBookKeeper(executor))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,15 +772,15 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception {

// Create producer second time
clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
successSubName, 2 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);

Awaitility.await().untilAsserted(() -> {
Object response = getResponse();
assertTrue(response instanceof CommandError, "Response is not CommandError but " + response);
CommandError error = (CommandError) response;
assertEquals(error.getError(), ServerError.ServiceNotReady);
assertEquals(error.getError(), ServerError.ConsumerBusy);
});
channel.finish();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
Expand Down Expand Up @@ -90,9 +91,11 @@ protected void afterSetup() throws Exception {
pulsarAdmins[0].tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("my-cluster")));
pulsarAdmins[0].namespaces().createNamespace(namespace, 10);
pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions);
String subName = "test";
pulsarAdmins[0].topics().createSubscription(partitionedTopicName.getPartitionedTopicName(), subName, MessageId.latest);
pulsarClient.newConsumer()
.topic(partitionedTopicName.getPartitionedTopicName())
.subscriptionName("test").subscribe();
.subscriptionName(subName).subscribe();
tbClient = TransactionBufferClientImpl.create(
((PulsarClientImpl) pulsarClient),
new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction.pendingack;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -45,9 +46,11 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -60,7 +63,11 @@
public class PendingAckPersistentTest extends TransactionTestBase {

private static final String PENDING_ACK_REPLAY_TOPIC = "persistent://public/txn/pending-ack-replay";

private static final String NAMESPACE = "public/txn";

private static final int NUM_PARTITIONS = 16;

@BeforeMethod
public void setup() throws Exception {
setBrokerCount(1);
Expand All @@ -75,7 +82,7 @@ public void setup() throws Exception {
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace("public/txn", 10);
admin.namespaces().createNamespace(NAMESPACE, 10);
admin.topics().createNonPartitionedTopic(PENDING_ACK_REPLAY_TOPIC);

pulsarClient = PulsarClient.builder()
Expand Down Expand Up @@ -298,4 +305,69 @@ public void cumulativePendingAckReplayTest() throws Exception {
.until(() -> ((PositionImpl) managedCursor.getMarkDeletedPosition())
.compareTo((PositionImpl) managedCursor.getManagedLedger().getLastConfirmedEntry()) == -1);
}

@Test
private void testDeleteSubThenDeletePendingAckManagedLedger() throws Exception {

String subName = "test-delete";

String topic = TopicName.get(TopicDomain.persistent.toString(),
NamespaceName.get(NAMESPACE), "test-delete").toString();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.subscribe();

consumer.close();

admin.topics().deleteSubscription(topic, subName);

List<String> topics = admin.namespaces().getTopics(NAMESPACE);

TopicStats topicStats = admin.topics().getStats(topic, false);

assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName)));

assertTrue(topics.contains(topic));
}

@Test
private void testDeleteTopicThenDeletePendingAckManagedLedger() throws Exception {

String subName1 = "test-delete";
String subName2 = "test-delete";

String topic = TopicName.get(TopicDomain.persistent.toString(),
NamespaceName.get(NAMESPACE), "test-delete").toString();
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName1)
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.subscribe();

consumer1.close();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName2)
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.subscribe();

consumer2.close();

admin.topics().delete(topic);

List<String> topics = admin.namespaces().getTopics(NAMESPACE);

assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName1)));
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName2)));
assertFalse(topics.contains(topic));
}
}

0 comments on commit a50fe87

Please sign in to comment.