Skip to content

Commit

Permalink
[Transaction]No TransactionCoordinatorNotFound, but automatic reconne…
Browse files Browse the repository at this point in the history
…ct (apache#13135)

### Motivation and Modification
We should not throw the following exceptions to the user to deal with.
1. `TransactionCoordinatorNotFound` or `ManagerLedgerFenceException`
           --- we should  retry the operation and reconnect to TC
2. `TransactionMetaStoreHandler` was connecting
          ---- add the operation into `pendingRequests`, and executed the requests in `pendingRequests` when the connected completely. 
3.  The complexity of concurrent operations is too high. For operations in a TransactionMetaStoreHandler, consider using single-threaded operations
        --- use `internalPinnedExecutor`
  • Loading branch information
liangyepianzhou authored Dec 14, 2021
1 parent f69a11e commit 56323e4
Show file tree
Hide file tree
Showing 4 changed files with 504 additions and 367 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolea
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);

}
completableFuture.completeExceptionally(e);
completableFuture.completeExceptionally(e.getCause());
return null;
})).exceptionally(e -> {
if (!isRetryableException(e.getCause())) {
Expand All @@ -371,7 +371,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolea
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);

}
completableFuture.completeExceptionally(e);
completableFuture.completeExceptionally(e.getCause());
return null;
});
} else {
Expand All @@ -391,7 +391,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolea
LOG.error("EndTxnInTransactionBuffer fail! TxnId : {}, "
+ "TxnAction : {}", txnID, txnAction, e);
}
completableFuture.completeExceptionally(e);
completableFuture.completeExceptionally(e.getCause());
return null;
});
} else {
Expand All @@ -409,7 +409,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, boolea
transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction, isTimeout),
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
}
completableFuture.completeExceptionally(e);
completableFuture.completeExceptionally(e.getCause());
return null;
});
return completableFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2016,7 +2016,26 @@ private boolean checkTransactionEnableAndSendError(long requestId) {
return true;
}
}
private Throwable handleTxnException(Throwable ex, String op, long requestId) {
if (ex instanceof CoordinatorException.CoordinatorNotFoundException || ex != null
&& ex.getCause() instanceof CoordinatorException.CoordinatorNotFoundException) {
if (log.isDebugEnabled()) {
log.debug("The Coordinator was not found for the request {}", op);
}
return ex;
}
if (ex instanceof ManagedLedgerException.ManagedLedgerFencedException || ex != null
&& ex.getCause() instanceof ManagedLedgerException.ManagedLedgerFencedException) {
if (log.isDebugEnabled()) {
log.debug("Throw a CoordinatorNotFoundException to client "
+ "with the message got from a ManagedLedgerFencedException for the request {}", op);
}
return new CoordinatorException.CoordinatorNotFoundException(ex.getMessage());

}
log.error("Send response error for {} request {}.", op, requestId, ex);
return ex;
}
@Override
protected void handleNewTxn(CommandNewTxn command) {
final long requestId = command.getRequestId();
Expand All @@ -2041,9 +2060,7 @@ protected void handleNewTxn(CommandNewTxn command) {
ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(),
txnID.getMostSigBits()));
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for new txn request {}", requestId, ex);
}
ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId);

ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
Expand Down Expand Up @@ -2079,19 +2096,11 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for add published partition to txn request {}", requestId,
ex);
}
ex = handleTxnException(ex, BaseCommand.Type.ADD_PARTITION_TO_TXN.name(), requestId);

if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
} else {
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex.getCause()),
ex.getCause().getMessage()));
}
ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
Expand All @@ -2118,16 +2127,10 @@ protected void handleEndTxn(CommandEndTxn command) {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
} else {
log.error("Send response error for end txn request.", ex);
ex = handleTxnException(ex, BaseCommand.Type.END_TXN.name(), requestId);
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));

if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
} else {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(ex.getCause()),
ex.getCause().getMessage()));
}
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
});
Expand Down Expand Up @@ -2338,20 +2341,11 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
txnID.getLeastSigBits(), txnID.getMostSigBits()));
log.info("handle add partition to txn finish.");
} else {
if (log.isDebugEnabled()) {
log.debug("Send response error for add published partition to txn request {}",
requestId, ex);
}
ex = handleTxnException(ex, BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN.name(), requestId);

if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
} else {
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex.getCause()),
ex.getCause().getMessage()));
}
ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId,
txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex),
ex.getMessage()));
transactionMetadataStoreService.handleOpFail(ex, tcId);
}
}));
Expand Down
Loading

0 comments on commit 56323e4

Please sign in to comment.