diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index 0eba23b496f95..97c7f4426604a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -25,7 +25,7 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.mledger.Position; @@ -164,8 +164,8 @@ public void testTimeoutTracker() throws Exception { checkTransactionMetadataStoreReady(transactionMetadataStore); Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap"); field.setAccessible(true); - ConcurrentMap>> txnMap = - (ConcurrentMap>>) field.get(transactionMetadataStore); + ConcurrentSkipListMap>> txnMap = + (ConcurrentSkipListMap>>) field.get(transactionMetadataStore); int i = -1; while (++i < 1000) { try { @@ -192,8 +192,8 @@ public void testTimeoutTrackerExpired() throws Exception { checkTransactionMetadataStoreReady(transactionMetadataStore); Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap"); field.setAccessible(true); - ConcurrentMap>> txnMap = - (ConcurrentMap>>) field.get(transactionMetadataStore); + ConcurrentSkipListMap>> txnMap = + (ConcurrentSkipListMap>>) field.get(transactionMetadataStore); transactionMetadataStore.newTransaction(2000).get(); @@ -224,8 +224,8 @@ public void testTimeoutTrackerMultiThreading() throws Exception { checkTransactionMetadataStoreReady(transactionMetadataStore); Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap"); field.setAccessible(true); - ConcurrentMap>> txnMap = - (ConcurrentMap>>) field.get(transactionMetadataStore); + ConcurrentSkipListMap>> txnMap = + (ConcurrentSkipListMap>>) field.get(transactionMetadataStore); new Thread(() -> { int i = -1; while (++i < 100) { @@ -276,7 +276,7 @@ public void testTimeoutTrackerMultiThreading() throws Exception { checkoutTimeout(txnMap, 0); } - private void checkoutTimeout(ConcurrentMap>> txnMap, int time) { + private void checkoutTimeout(ConcurrentSkipListMap>> txnMap, int time) { Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS) .until(() -> txnMap.size() == time); } @@ -310,8 +310,8 @@ public void transactionTimeoutRecoverTest() throws Exception { Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap"); field.setAccessible(true); - ConcurrentMap>> txnMap = - (ConcurrentMap>>) field.get(transactionMetadataStore); + ConcurrentSkipListMap>> txnMap = + (ConcurrentSkipListMap>>) field.get(transactionMetadataStore); Awaitility.await().until(() -> txnMap.size() == 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java index 9c7d575edfb7c..e7f1b8c7814c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.java @@ -127,7 +127,7 @@ protected void cleanup() throws Exception { } @Test - public void testLowWaterMark() throws Exception { + public void testTransactionBufferLowWaterMark() throws Exception { Transaction txn = pulsarClient.newTransaction() .withTransactionTimeout(5, TimeUnit.SECONDS) .build().get(); diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index d51f857ea4d7e..a4dec91b703c1 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -23,9 +23,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.tuple.MutablePair; @@ -62,8 +60,7 @@ public class MLTransactionMetadataStore private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED); private final MLTransactionLogImpl transactionLog; private static final long TC_ID_NOT_USED = -1L; - private final ConcurrentMap>> txnMetaMap = new ConcurrentHashMap<>(); - private final ConcurrentSkipListSet txnIdSortedSet = new ConcurrentSkipListSet<>(); + private final ConcurrentSkipListMap>> txnMetaMap = new ConcurrentSkipListMap<>(); private final TransactionTimeoutTracker timeoutTracker; public MLTransactionMetadataStore(TransactionCoordinatorID tcID, @@ -102,55 +99,54 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(), transactionMetadataEntry.getTxnidLeastBits()); + long transactionId = transactionMetadataEntry.getTxnidLeastBits(); switch (transactionMetadataEntry.getMetadataOp()) { case NEW: long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits(); - if (txnMetaMap.containsKey(txnID)) { - txnMetaMap.get(txnID).getRight().add(position); + if (txnMetaMap.containsKey(transactionId)) { + txnMetaMap.get(transactionId).getRight().add(position); } else { List positions = new ArrayList<>(); positions.add(position); - txnMetaMap.put(txnID, MutablePair.of(new TxnMetaImpl(txnID), positions)); - txnIdSortedSet.add(transactionMetadataEntry.getTxnidLeastBits()); + txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID), positions)); recoverTracker.handleOpenStatusTransaction(txnSequenceId, transactionMetadataEntry.getTimeoutMs() + transactionMetadataEntry.getStartTime()); } break; case ADD_PARTITION: - if (!txnMetaMap.containsKey(txnID)) { + if (!txnMetaMap.containsKey(transactionId)) { transactionLog.deletePosition(Collections.singletonList(position)); } else { - txnMetaMap.get(txnID).getLeft() + txnMetaMap.get(transactionId).getLeft() .addProducedPartitions(transactionMetadataEntry.getPartitionsList()); - txnMetaMap.get(txnID).getRight().add(position); + txnMetaMap.get(transactionId).getRight().add(position); } break; case ADD_SUBSCRIPTION: - if (!txnMetaMap.containsKey(txnID)) { + if (!txnMetaMap.containsKey(transactionId)) { transactionLog.deletePosition(Collections.singletonList(position)); } else { - txnMetaMap.get(txnID).getLeft() + txnMetaMap.get(transactionId).getLeft() .addAckedPartitions(subscriptionToTxnSubscription( transactionMetadataEntry.getSubscriptionsList())); - txnMetaMap.get(txnID).getRight().add(position); + txnMetaMap.get(transactionId).getRight().add(position); } break; case UPDATE: - if (!txnMetaMap.containsKey(txnID)) { + if (!txnMetaMap.containsKey(transactionId)) { transactionLog.deletePosition(Collections.singletonList(position)); } else { TxnStatus newStatus = transactionMetadataEntry.getNewStatus(); - txnMetaMap.get(txnID).getLeft() + txnMetaMap.get(transactionId).getLeft() .updateTxnStatus(transactionMetadataEntry.getNewStatus(), transactionMetadataEntry.getExpectedStatus()); - txnMetaMap.get(txnID).getRight().add(position); + txnMetaMap.get(transactionId).getRight().add(position); recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus); if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { - transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v -> { - txnMetaMap.remove(txnID).getLeft(); - txnIdSortedSet.remove(transactionMetadataEntry.getTxnidLeastBits()); - }); + transactionLog.deletePosition(txnMetaMap + .get(transactionId).getRight()).thenAccept(v -> + txnMetaMap.remove(transactionId).getLeft()); } } break; @@ -169,12 +165,12 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran @Override public CompletableFuture getTxnStatus(TxnID txnID) { - return CompletableFuture.completedFuture(txnMetaMap.get(txnID).getLeft().status()); + return CompletableFuture.completedFuture(txnMetaMap.get(txnID.getLeastSigBits()).getLeft().status()); } @Override public CompletableFuture getTxnMeta(TxnID txnID) { - Pair> txnMetaListPair = txnMetaMap.get(txnID); + Pair> txnMetaListPair = txnMetaMap.get(txnID.getLeastSigBits()); CompletableFuture completableFuture = new CompletableFuture<>(); if (txnMetaListPair == null) { completableFuture.completeExceptionally(new TransactionNotFoundException(txnID)); @@ -210,9 +206,8 @@ public synchronized CompletableFuture newTransaction(long timeOut) { List positions = new ArrayList<>(); positions.add(position); Pair> pair = MutablePair.of(txn, positions); - txnMetaMap.put(txnID, pair); + txnMetaMap.put(leastSigBits, pair); this.timeoutTracker.addTransaction(leastSigBits, timeOut); - this.txnIdSortedSet.add(leastSigBits); return CompletableFuture.completedFuture(txnID); }); } @@ -238,7 +233,7 @@ public synchronized CompletableFuture addProducedPartitionToTxn(TxnID txnI try { synchronized (txnMetaListPair.getLeft()) { txnMetaListPair.getLeft().addProducedPartitions(partitions); - txnMetaMap.get(txnID).getRight().add(position); + txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position); } return CompletableFuture.completedFuture(null); } catch (InvalidTxnStatusException e) { @@ -274,7 +269,7 @@ public synchronized CompletableFuture addAckedPartitionToTxn(TxnID txnID, try { synchronized (txnMetaListPair.getLeft()) { txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions); - txnMetaMap.get(txnID).getRight().add(position); + txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position); } return CompletableFuture.completedFuture(null); } catch (InvalidTxnStatusException e) { @@ -317,8 +312,7 @@ public synchronized CompletableFuture updateTxnStatus(TxnID txnID, TxnStat } if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> { - txnMetaMap.remove(txnID); - txnIdSortedSet.remove(txnID.getLeastSigBits()); + txnMetaMap.remove(txnID.getLeastSigBits()); return CompletableFuture.completedFuture(null); }); } @@ -337,7 +331,7 @@ public synchronized CompletableFuture updateTxnStatus(TxnID txnID, TxnStat @Override public long getLowWaterMark() { try { - return this.txnIdSortedSet.first() - 1; + return this.txnMetaMap.firstKey() - 1; } catch (NoSuchElementException e) { return 0L; } @@ -350,7 +344,7 @@ public TransactionCoordinatorID getTransactionCoordinatorID() { private CompletableFuture>> getTxnPositionPair(TxnID txnID) { CompletableFuture>> completableFuture = new CompletableFuture<>(); - Pair> txnMetaListPair = txnMetaMap.get(txnID); + Pair> txnMetaListPair = txnMetaMap.get(txnID.getLeastSigBits()); if (txnMetaListPair == null) { completableFuture.completeExceptionally(new TransactionNotFoundException(txnID)); } else {