Skip to content

Commit

Permalink
[Transaction] Fix transactionMetadata store recover timeout problem. (a…
Browse files Browse the repository at this point in the history
…pache#10162)

## Motivation
1. now recover tc use timeout is ```currentTime``` + ```transactionTimeout```, it is not right. it need to use ```startTransactionTime``` + ```transactionTimeout```.
2. fix lose time out, the original logical will pop the useful transaction from the ```priorityQueue```
  • Loading branch information
congbobo184 authored Apr 21, 2021
1 parent ef06691 commit be9d4d4
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker;


import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.ABORTING;
import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTING;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -36,8 +35,10 @@
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.BrokerPersistenceException;
import org.apache.pulsar.client.api.PulsarClientException.LookupException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
Expand All @@ -56,7 +57,6 @@
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
Expand Down Expand Up @@ -249,7 +249,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction) {
updateTxnStatus(txnID, newStatus, TxnStatus.OPEN).thenAccept(v ->
endTxnInTransactionBuffer(txnID, txnAction).thenAccept(a ->
completableFuture.complete(null)).exceptionally(e -> {
if (!isRetryableException(e)) {
if (!isRetryableException(e.getCause())) {
LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, e);
} else {
Expand All @@ -265,7 +265,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction) {
completableFuture.completeExceptionally(e);
return null;
})).exceptionally(e -> {
if (!isRetryableException(e)) {
if (!isRetryableException(e.getCause())) {
LOG.error("EndTransaction UpdateTxnStatus fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, e);
} else {
Expand All @@ -283,19 +283,19 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction) {
} else {
if ((txnStatus == COMMITTING && txnAction == TxnAction.COMMIT.getValue())
|| (txnStatus == ABORTING && txnAction == TxnAction.ABORT.getValue())) {
endTxnInTransactionBuffer(txnID, txnAction).exceptionally(e -> {
if (!isRetryableException(e.getCause())) {
LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, e);
} else {
endTxnInTransactionBuffer(txnID, txnAction).thenAccept(k ->
completableFuture.complete(null)).exceptionally(e -> {
if (isRetryableException(e.getCause())) {
if (LOG.isDebugEnabled()) {
LOG.debug("EndTxnInTransactionBuffer retry! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, e);
}
transactionOpRetryTimer.newTimeout(timeout ->
endTransaction(txnID, txnAction),
endTransaction(txnID, txnAction),
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);

} else {
LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, e);
}
completableFuture.completeExceptionally(e);
return null;
Expand All @@ -308,7 +308,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction) {
}
}
}).exceptionally(e -> {
if (!isRetryableException(e)) {
if (isRetryableException(e.getCause())) {
if (LOG.isDebugEnabled()) {
LOG.debug("End transaction op retry! TxnId : {}, TxnAction : {}", txnID, txnAction, e);
}
Expand All @@ -329,7 +329,7 @@ public CompletableFuture<Void> endTransactionForTimeout(TxnID txnID) {
return null;
}
}).exceptionally(e -> {
if (!(e instanceof TransactionNotFoundException)) {
if (isRetryableException(e.getCause())) {
endTransaction(txnID, TxnAction.ABORT_VALUE);
} else {
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -401,7 +401,9 @@ private static boolean isRetryableException(Throwable e) {
if (e instanceof TransactionMetadataStoreStateException
|| e instanceof RequestTimeoutException
|| e instanceof ManagedLedgerException
|| e instanceof PulsarClientException.BrokerPersistenceException) {
|| e instanceof BrokerPersistenceException
|| e instanceof LookupException
|| e instanceof ReachMaxPendingOpsException) {
return true;
} else {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -154,7 +154,7 @@ public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartition
} else {
log.error("[{}] Got end txn on topic response for request {} error {}", op.topic, response.getRequestId(),
response.getError());
op.cb.completeExceptionally(getException(response.getError(), response.getMessage()));
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
}
op.recycle();
}
Expand All @@ -180,7 +180,7 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
} else {
log.error("[{}] Got end txn on subscription response for request {} error {}",
op.topic, response.getRequestId(), response.getError());
op.cb.completeExceptionally(getException(response.getError(), response.getMessage()));
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
}
op.recycle();
}
Expand Down Expand Up @@ -216,17 +216,13 @@ private CompletableFuture<String> getServiceUrl(String topic) {
});
}

private TransactionBufferClientException getException(ServerError serverError, String msg) {
return new TransactionBufferClientException(msg);
}

private boolean canSendRequest(CompletableFuture<?> callback) {
try {
if (blockIfReachMaxPendingOps) {
semaphore.acquire();
} else {
if (!semaphore.tryAcquire()) {
callback.completeExceptionally(new TransactionBufferClientException("Reach max pending ops."));
callback.completeExceptionally(new ReachMaxPendingOpsException("Reach max pending ops."));
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ public class TransactionTimeoutTrackerImpl implements TransactionTimeoutTracker,
private final Clock clock;
private Timeout currentTimeout;
private final static long INITIAL_TIMEOUT = 1L;
// The timeout may wait time longer than the new transaction timeout time, so we should cancel the current timeout
// and create a timeout wait time is the new transaction timeout time.
private long nowTaskTimeoutTime = INITIAL_TIMEOUT;

private volatile long nowTaskTimeoutTime = INITIAL_TIMEOUT;
private final long tcId;
private final TransactionMetadataStoreService transactionMetadataStoreService;

Expand All @@ -65,15 +64,15 @@ public CompletableFuture<Boolean> addTransaction(long sequenceId, long timeout)
}
synchronized (this){
long nowTime = clock.millis();
priorityQueue.add(timeout + nowTime, tcId, sequenceId);
long nowTransactionTimeoutTime = nowTime + timeout;
if (nowTaskTimeoutTime == INITIAL_TIMEOUT) {
long transactionTimeoutTime = nowTime + timeout;
priorityQueue.add(transactionTimeoutTime, tcId, sequenceId);
if (this.currentTimeout == null) {
currentTimeout = timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
nowTaskTimeoutTime = nowTransactionTimeoutTime;
} else if (nowTaskTimeoutTime > nowTransactionTimeoutTime) {
nowTaskTimeoutTime = transactionTimeoutTime;
} else if (nowTaskTimeoutTime > transactionTimeoutTime) {
if (currentTimeout.cancel()) {
currentTimeout = timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
nowTaskTimeoutTime = nowTransactionTimeoutTime;
nowTaskTimeoutTime = transactionTimeoutTime;
}
}
}
Expand All @@ -82,13 +81,18 @@ public CompletableFuture<Boolean> addTransaction(long sequenceId, long timeout)

@Override
public void replayAddTransaction(long sequenceId, long timeout) {
long nowTime = clock.millis();
priorityQueue.add(timeout + nowTime, tcId, sequenceId);
priorityQueue.add(timeout, tcId, sequenceId);
}

@Override
public void start() {
run(null);
synchronized (this) {
if (currentTimeout == null && !priorityQueue.isEmpty()) {
this.currentTimeout = this.timer.newTimeout(this,
priorityQueue.peekN1() - this.clock.millis(), TimeUnit.MILLISECONDS);
this.nowTaskTimeoutTime = priorityQueue.peekN1();
}
}
}

@Override
Expand All @@ -108,14 +112,13 @@ public void run(Timeout timeout) {
if (timeoutTime < nowTime){
transactionMetadataStoreService.endTransactionForTimeout(new TxnID(priorityQueue.peekN2(),
priorityQueue.peekN3()));
priorityQueue.pop();
} else {
currentTimeout = timer
.newTimeout(this,
timeoutTime - clock.millis(), TimeUnit.MILLISECONDS);
nowTaskTimeoutTime = nowTime + timeoutTime;
.newTimeout(this, timeoutTime - clock.millis(), TimeUnit.MILLISECONDS);
nowTaskTimeoutTime = timeoutTime;
break;
}
priorityQueue.pop();
}
}
}
Expand Down
Loading

0 comments on commit be9d4d4

Please sign in to comment.