Skip to content

Commit

Permalink
[Transaction] Fix transaction buffer op fail handle. (apache#10322)
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 authored Apr 22, 2021
1 parent e0fefe2 commit 37e3180
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.common.naming.NamespaceName;
Expand Down Expand Up @@ -1824,8 +1825,9 @@ protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
final TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
final long requestId = command.getRequestId();
if (log.isDebugEnabled()) {
log.debug("Receive add published partition to txn request {} from {} with txnId {}",
requestId, remoteAddress, txnID);
command.getPartitionsList().forEach(partion ->
log.debug("Receive add published partition to txn request {} "
+ "from {} with txnId {}, topic: [{}]", requestId, remoteAddress, txnID, partion));
}
service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID,
command.getPartitionsList())
Expand Down Expand Up @@ -1860,7 +1862,7 @@ protected void handleEndTxn(CommandEndTxn command) {
.exceptionally(throwable -> {
log.error("Send response error for end txn request.", throwable);
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
BrokerServiceException.getClientErrorCode(throwable.getCause()), throwable.getMessage()));
return null; });
}

Expand All @@ -1871,26 +1873,42 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
final int txnAction = command.getTxnAction().getValue();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());

service.getTopics().get(TopicName.get(topic).toString()).whenComplete((optionalTopic, t) -> {
if (!optionalTopic.isPresent()) {
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.TopicNotFound,
"Topic " + topic + " is not found."));
return;
}
optionalTopic.get().endTxn(txnID, txnAction, command.getTxnidLeastBitsOfLowWatermark())
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle endTxnOnPartition {} failed.", topic, throwable);
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, BrokerServiceException.getClientErrorCode(throwable),
throwable.getMessage()));
return;
}
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
});
});
if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", topic,
txnID, txnAction);
}
CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, t) -> {
if (!optionalTopic.isPresent()) {
log.error("handleEndTxnOnPartition faile ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, ServerError.ServiceNotReady,
"Topic " + topic + " is not found."));
return;
}
optionalTopic.get().endTxn(txnID, txnAction, command.getTxnidLeastBitsOfLowWatermark())
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle endTxnOnPartition {} failed.", topic, throwable);
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(
requestId, BrokerServiceException.getClientErrorCode(throwable),
throwable.getMessage()));
return;
}
ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
});
});
} else {
log.error("handleEndTxnOnPartition faile ! The topic {} does not exist in broker, "
+ "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(),
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
}
}

@Override
Expand All @@ -1902,13 +1920,22 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
final String subName = command.getSubscription().getSubscription();
final int txnAction = command.getTxnAction().getValue();

service.getTopics().get(TopicName.get(topic).toString())
.thenAccept(optionalTopic -> {
if (log.isDebugEnabled()) {
log.debug("[{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), txnAction);
}

CompletableFuture<Optional<Topic>> topicFuture = service.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.thenAccept(optionalTopic -> {

if (!optionalTopic.isPresent()) {
log.error("The topic {} is not exist in broker.", topic);
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: "
+ "[{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.UnknownError,
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
return;
}
Expand All @@ -1918,7 +1945,7 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
log.error("Topic {} subscription {} is not exist.", optionalTopic.get().getName(), subName);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.UnknownError,
ServerError.ServiceNotReady,
"Topic " + optionalTopic.get().getName()
+ " subscription " + subName + " is not exist."));
return;
Expand All @@ -1929,17 +1956,33 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
command.getTxnidLeastBitsOfLowWatermark());
completableFuture.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle end txn on subscription failed for request {}", requestId);
log.error("Handle end txn on subscription failed for request {}", requestId, throwable);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.UnknownError,
BrokerServiceException.getClientErrorCode(throwable),
"Handle end txn on subscription failed."));
return;
}
ctx.writeAndFlush(
Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
});
}).exceptionally(e -> {
log.error("Handle end txn on subscription failed for request {}", requestId, e);
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"Handle end txn on subscription failed."));
return null;
});
} else {
log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: "
+ "[{}], txnAction: [{}]", topic,
new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction));
ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(
requestId, txnidLeastBits, txnidMostBits,
ServerError.ServiceNotReady,
"The topic " + topic + " is not exist in broker."));
}
}

private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl;
import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
Expand All @@ -57,6 +58,7 @@
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -176,6 +178,43 @@ public void testAbortOnSubscription() throws ExecutionException, InterruptedExce
}
}

@Test
public void testTransactionBufferOpFail() throws InterruptedException, ExecutionException {
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>[] originalMaps =
new ConcurrentOpenHashMap[brokerServices.length];
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topicMap = new ConcurrentOpenHashMap<>();
for (int i = 0; i < brokerServices.length; i++) {
originalMaps[i] = brokerServices[i].getTopics();
when(brokerServices[i].getTopics()).thenReturn(topicMap);
}

try {
tbClient.abortTxnOnSubscription(
partitionedTopicName.getPartition(0).toString(), "test", 1L, 1, -1L).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.LookupException);
}

try {
tbClient.abortTxnOnTopic(
partitionedTopicName.getPartition(0).toString(), 1L, 1, -1L).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.LookupException);
}

for (int i = 0; i < brokerServices.length; i++) {
when(brokerServices[i].getTopics()).thenReturn(originalMaps[i]);
}

tbClient.abortTxnOnSubscription(
partitionedTopicName.getPartition(0).toString(), "test", 1L, 1, -1L).get();

tbClient.abortTxnOnTopic(
partitionedTopicName.getPartition(0).toString(), 1L, 1, -1L).get();
}

@Test
public void testTransactionBufferClientTimeout() throws Exception {
ConnectionPool connectionPool = mock(ConnectionPool.class);
Expand Down

0 comments on commit 37e3180

Please sign in to comment.