Skip to content

Commit

Permalink
[Transaction] Fix transaction ack one topic with multi sub. (apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 authored May 25, 2021
1 parent 4d4f624 commit 3311c22
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Sets;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
Expand All @@ -40,12 +41,14 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
Expand All @@ -69,6 +72,8 @@
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -899,4 +904,61 @@ public void txnTransactionRedeliverNullDispatcher(CommandAck.AckType ackType) th
txn.abort().get();
assertTrue(exist);
}

@Test
public void oneTransactionOneTopicWithMultiSubTest() throws Exception {
String topic = NAMESPACE1 + "/oneTransactionOneTopicWithMultiSubTest";
final String subName1 = "test1";
final String subName2 = "test2";
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName(subName1)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
.subscribe();
Awaitility.await().until(consumer1::isConnected);

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient
.newConsumer()
.topic(topic)
.subscriptionName(subName2)
.acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS)
.subscribe();
Awaitility.await().until(consumer2::isConnected);

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();

MessageId messageId = producer.send(("Hello Pulsar").getBytes(UTF_8));
TransactionImpl txn = (TransactionImpl) getTxn();
consumer1.acknowledgeAsync(messageId, txn).get();
consumer2.acknowledgeAsync(messageId, txn).get();

boolean flag = false;
for (int i = 0; i < getPulsarServiceList().size(); i++) {
TransactionMetadataStoreService transactionMetadataStoreService =
getPulsarServiceList().get(i).getTransactionMetadataStoreService();
if (transactionMetadataStoreService.getStores()
.containsKey(TransactionCoordinatorID.get(txn.getTxnIdMostBits()))) {
List<TransactionSubscription> list = transactionMetadataStoreService
.getTxnMeta(new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())).get().ackedPartitions();
flag = true;
assertEquals(list.size(), 2);
if (list.get(0).getSubscription().equals(subName1)) {
assertEquals(list.get(1).getSubscription(), subName2);
} else {
assertEquals(list.get(0).getSubscription(), subName2);
assertEquals(list.get(1).getSubscription(), subName1);
}
}
}
assertTrue(flag);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Lists;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
Expand Down Expand Up @@ -55,7 +56,7 @@ public class TransactionImpl implements Transaction {
private final long txnIdMostBits;

private final Map<String, CompletableFuture<Void>> registerPartitionMap;
private final Map<String, CompletableFuture<Void>> registerSubscriptionMap;
private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
private final TransactionCoordinatorClientImpl tcClient;
private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;

Expand Down Expand Up @@ -117,7 +118,7 @@ public CompletableFuture<Void> registerAckedTopic(String topic, String subscript
return checkIfOpen().thenCompose(value -> {
synchronized (TransactionImpl.this) {
// we need to issue the request to TC to register the acked topic
return registerSubscriptionMap.compute(topic, (key, future) -> {
return registerSubscriptionMap.compute(Pair.of(topic, subscription), (key, future) -> {
if (future != null) {
return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
} else {
Expand Down

0 comments on commit 3311c22

Please sign in to comment.