Skip to content

Commit

Permalink
[Transaction] Transaction log low water mark optimization. (apache#10422
Browse files Browse the repository at this point in the history
)

## Motivation
`txnIdSortedSet` is redundant, `txnMetaMap` can replace it.
  • Loading branch information
congbobo184 authored Apr 29, 2021
1 parent 5238822 commit bdb0990
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -164,8 +164,8 @@ public void testTimeoutTracker() throws Exception {
checkTransactionMetadataStoreReady(transactionMetadataStore);
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
field.setAccessible(true);
ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
int i = -1;
while (++i < 1000) {
try {
Expand All @@ -192,8 +192,8 @@ public void testTimeoutTrackerExpired() throws Exception {
checkTransactionMetadataStoreReady(transactionMetadataStore);
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
field.setAccessible(true);
ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);

transactionMetadataStore.newTransaction(2000).get();

Expand Down Expand Up @@ -224,8 +224,8 @@ public void testTimeoutTrackerMultiThreading() throws Exception {
checkTransactionMetadataStoreReady(transactionMetadataStore);
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
field.setAccessible(true);
ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
new Thread(() -> {
int i = -1;
while (++i < 100) {
Expand Down Expand Up @@ -276,7 +276,7 @@ public void testTimeoutTrackerMultiThreading() throws Exception {
checkoutTimeout(txnMap, 0);
}

private void checkoutTimeout(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap, int time) {
private void checkoutTimeout(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap, int time) {
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS)
.until(() -> txnMap.size() == time);
}
Expand Down Expand Up @@ -310,8 +310,8 @@ public void transactionTimeoutRecoverTest() throws Exception {

Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
field.setAccessible(true);
ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
Awaitility.await().until(() -> txnMap.size() == 0);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected void cleanup() throws Exception {
}

@Test
public void testLowWaterMark() throws Exception {
public void testTransactionBufferLowWaterMark() throws Exception {
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.MutablePair;
Expand Down Expand Up @@ -62,8 +60,7 @@ public class MLTransactionMetadataStore
private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
private final MLTransactionLogImpl transactionLog;
private static final long TC_ID_NOT_USED = -1L;
private final ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentHashMap<>();
private final ConcurrentSkipListSet<Long> txnIdSortedSet = new ConcurrentSkipListSet<>();
private final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>();
private final TransactionTimeoutTracker timeoutTracker;

public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
Expand Down Expand Up @@ -102,55 +99,54 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran

TxnID txnID = new TxnID(transactionMetadataEntry.getTxnidMostBits(),
transactionMetadataEntry.getTxnidLeastBits());
long transactionId = transactionMetadataEntry.getTxnidLeastBits();
switch (transactionMetadataEntry.getMetadataOp()) {
case NEW:
long txnSequenceId = transactionMetadataEntry.getTxnidLeastBits();
if (txnMetaMap.containsKey(txnID)) {
txnMetaMap.get(txnID).getRight().add(position);
if (txnMetaMap.containsKey(transactionId)) {
txnMetaMap.get(transactionId).getRight().add(position);
} else {
List<Position> positions = new ArrayList<>();
positions.add(position);
txnMetaMap.put(txnID, MutablePair.of(new TxnMetaImpl(txnID), positions));
txnIdSortedSet.add(transactionMetadataEntry.getTxnidLeastBits());
txnMetaMap.put(transactionId, MutablePair.of(new TxnMetaImpl(txnID), positions));
recoverTracker.handleOpenStatusTransaction(txnSequenceId,
transactionMetadataEntry.getTimeoutMs()
+ transactionMetadataEntry.getStartTime());
}
break;
case ADD_PARTITION:
if (!txnMetaMap.containsKey(txnID)) {
if (!txnMetaMap.containsKey(transactionId)) {
transactionLog.deletePosition(Collections.singletonList(position));
} else {
txnMetaMap.get(txnID).getLeft()
txnMetaMap.get(transactionId).getLeft()
.addProducedPartitions(transactionMetadataEntry.getPartitionsList());
txnMetaMap.get(txnID).getRight().add(position);
txnMetaMap.get(transactionId).getRight().add(position);
}
break;
case ADD_SUBSCRIPTION:
if (!txnMetaMap.containsKey(txnID)) {
if (!txnMetaMap.containsKey(transactionId)) {
transactionLog.deletePosition(Collections.singletonList(position));
} else {
txnMetaMap.get(txnID).getLeft()
txnMetaMap.get(transactionId).getLeft()
.addAckedPartitions(subscriptionToTxnSubscription(
transactionMetadataEntry.getSubscriptionsList()));
txnMetaMap.get(txnID).getRight().add(position);
txnMetaMap.get(transactionId).getRight().add(position);
}
break;
case UPDATE:
if (!txnMetaMap.containsKey(txnID)) {
if (!txnMetaMap.containsKey(transactionId)) {
transactionLog.deletePosition(Collections.singletonList(position));
} else {
TxnStatus newStatus = transactionMetadataEntry.getNewStatus();
txnMetaMap.get(txnID).getLeft()
txnMetaMap.get(transactionId).getLeft()
.updateTxnStatus(transactionMetadataEntry.getNewStatus(),
transactionMetadataEntry.getExpectedStatus());
txnMetaMap.get(txnID).getRight().add(position);
txnMetaMap.get(transactionId).getRight().add(position);
recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
transactionLog.deletePosition(txnMetaMap.get(txnID).getRight()).thenAccept(v -> {
txnMetaMap.remove(txnID).getLeft();
txnIdSortedSet.remove(transactionMetadataEntry.getTxnidLeastBits());
});
transactionLog.deletePosition(txnMetaMap
.get(transactionId).getRight()).thenAccept(v ->
txnMetaMap.remove(transactionId).getLeft());
}
}
break;
Expand All @@ -169,12 +165,12 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran

@Override
public CompletableFuture<TxnStatus> getTxnStatus(TxnID txnID) {
return CompletableFuture.completedFuture(txnMetaMap.get(txnID).getLeft().status());
return CompletableFuture.completedFuture(txnMetaMap.get(txnID.getLeastSigBits()).getLeft().status());
}

@Override
public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
Pair<TxnMeta, List<Position>> txnMetaListPair = txnMetaMap.get(txnID);
Pair<TxnMeta, List<Position>> txnMetaListPair = txnMetaMap.get(txnID.getLeastSigBits());
CompletableFuture<TxnMeta> completableFuture = new CompletableFuture<>();
if (txnMetaListPair == null) {
completableFuture.completeExceptionally(new TransactionNotFoundException(txnID));
Expand Down Expand Up @@ -210,9 +206,8 @@ public synchronized CompletableFuture<TxnID> newTransaction(long timeOut) {
List<Position> positions = new ArrayList<>();
positions.add(position);
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
txnMetaMap.put(txnID, pair);
txnMetaMap.put(leastSigBits, pair);
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
this.txnIdSortedSet.add(leastSigBits);
return CompletableFuture.completedFuture(txnID);
});
}
Expand All @@ -238,7 +233,7 @@ public synchronized CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnI
try {
synchronized (txnMetaListPair.getLeft()) {
txnMetaListPair.getLeft().addProducedPartitions(partitions);
txnMetaMap.get(txnID).getRight().add(position);
txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
}
return CompletableFuture.completedFuture(null);
} catch (InvalidTxnStatusException e) {
Expand Down Expand Up @@ -274,7 +269,7 @@ public synchronized CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnID,
try {
synchronized (txnMetaListPair.getLeft()) {
txnMetaListPair.getLeft().addAckedPartitions(txnSubscriptions);
txnMetaMap.get(txnID).getRight().add(position);
txnMetaMap.get(txnID.getLeastSigBits()).getRight().add(position);
}
return CompletableFuture.completedFuture(null);
} catch (InvalidTxnStatusException e) {
Expand Down Expand Up @@ -317,8 +312,7 @@ public synchronized CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStat
}
if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
return transactionLog.deletePosition(txnMetaListPair.getRight()).thenCompose(v -> {
txnMetaMap.remove(txnID);
txnIdSortedSet.remove(txnID.getLeastSigBits());
txnMetaMap.remove(txnID.getLeastSigBits());
return CompletableFuture.completedFuture(null);
});
}
Expand All @@ -337,7 +331,7 @@ public synchronized CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStat
@Override
public long getLowWaterMark() {
try {
return this.txnIdSortedSet.first() - 1;
return this.txnMetaMap.firstKey() - 1;
} catch (NoSuchElementException e) {
return 0L;
}
Expand All @@ -350,7 +344,7 @@ public TransactionCoordinatorID getTransactionCoordinatorID() {

private CompletableFuture<Pair<TxnMeta, List<Position>>> getTxnPositionPair(TxnID txnID) {
CompletableFuture<Pair<TxnMeta, List<Position>>> completableFuture = new CompletableFuture<>();
Pair<TxnMeta, List<Position>> txnMetaListPair = txnMetaMap.get(txnID);
Pair<TxnMeta, List<Position>> txnMetaListPair = txnMetaMap.get(txnID.getLeastSigBits());
if (txnMetaListPair == null) {
completableFuture.completeExceptionally(new TransactionNotFoundException(txnID));
} else {
Expand Down

0 comments on commit bdb0990

Please sign in to comment.