Skip to content

Commit

Permalink
[Transaction]Txn client check timeout (apache#12521)
Browse files Browse the repository at this point in the history
### Motivation
Optimize the logic on the Transaction Client side.
Avoid sending and acking messages with timeout  transactions.

### Modifications

* TransactionImp

     *  Add a tool field for CAS to replace State : STATE_UPDATE.
**When committing and aborted, only the successful cas operation will make subsequent judgments, otherwise it will return a failure future**
     *   Implement TimerTasker to execute tasks that replace the state of the transaction as Aborted.
* TransactionBuildImpl
     * In the callback of the build method, call the timer of PulsarClient to start a Timeout. Pass in the corresponding transactionImpl (TimeTasker has been implemented)
  • Loading branch information
liangyepianzhou authored Dec 18, 2021
1 parent dbb5081 commit c5d7a84
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ public void setup() throws Exception {
super.producerBaseSetup();
doReturn(1L).when(transaction).getTxnIdLeastBits();
doReturn(1L).when(transaction).getTxnIdMostBits();
doReturn(TransactionImpl.State.OPEN).when(transaction).getState();
CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
doNothing().when(transaction).registerAckOp(any());
doReturn(true).when(transaction).checkIfOpen(any());
doReturn(completableFuture).when(transaction).registerAckedTopic(any(), any());

Thread.sleep(1000 * 3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
Expand Down Expand Up @@ -770,18 +772,45 @@ public void produceAndConsumeCloseStateTxnTest() throws Exception {
}
});

Class<TransactionImpl> transactionClass = TransactionImpl.class;
Constructor<TransactionImpl> constructor = transactionClass
.getDeclaredConstructor(PulsarClientImpl.class, long.class, long.class, long.class);
constructor.setAccessible(true);

TransactionImpl timeoutTxnSkipClientTimeout = constructor.newInstance(pulsarClient, 5,
timeoutTxn.getTxnID().getLeastSigBits(), timeoutTxn.getTxnID().getMostSigBits());

try {
timeoutTxn.commit().get();
timeoutTxnSkipClientTimeout.commit().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionNotFoundException);
}
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxn);
TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxnSkipClientTimeout);
assertEquals(state, TransactionImpl.State.ERROR);
}

@Test
public void testTxnTimeoutAtTransactionMetadataStore() throws Exception{
TxnID txnID = pulsarServiceList.get(0).getTransactionMetadataStoreService()
.newTransaction(new TransactionCoordinatorID(0), 1).get();
Awaitility.await().until(() -> {
try {
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
return false;
} catch (Exception e) {
return true;
}
});
Collection<TransactionMetadataStore> transactionMetadataStores =
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
long timeoutCount = transactionMetadataStores.stream()
.mapToLong(store -> store.getMetadataStoreStats().timeoutCount).sum();
Assert.assertEquals(timeoutCount, 1);
}

@Test
public void transactionTimeoutTest() throws Exception {
String topic = NAMESPACE1 + "/txn-timeout";
Expand Down Expand Up @@ -943,4 +972,38 @@ public void oneTransactionOneTopicWithMultiSubTest() throws Exception {
}
assertTrue(flag);
}

@Test
public void testTxnTimeOutInClient() throws Exception{
String topic = NAMESPACE1 + "/testTxnTimeOutInClient";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
.topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).consumerName("testTxnTimeOut_consumer")
.topic(topic).subscriptionName("testTxnTimeOut_sub").subscribe();

Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();
producer.newMessage().send();
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(((TransactionImpl)transaction).getState(), TransactionImpl.State.TIMEOUT);
});

try {
producer.newMessage(transaction).send();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause().getCause() instanceof TransactionCoordinatorClientException
.InvalidTxnStatusException);
}
try {
Message<String> message = consumer.receive();
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException
.InvalidTxnStatusException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public static class InvalidTxnStatusException extends TransactionCoordinatorClie
public InvalidTxnStatusException(String message) {
super(message);
}

public InvalidTxnStatusException(String txnId, String actualState, String expectState) {
super("["+ txnId +"] with unexpected state : "
+ actualState + ", expect " + expectState + " state!");
}
}

/**
Expand All @@ -93,6 +98,21 @@ public MetaStoreHandlerNotExistsException(String message) {
}
}


/**
* Thrown when transaction meta was timeout.
*/
public static class TransactionTimeotException extends TransactionCoordinatorClientException {

public TransactionTimeotException(Throwable t) {
super(t);
}

public TransactionTimeotException(String transactionId) {
super("The transaction " + transactionId + " is timeout.");
}
}

/**
* Thrown when send request to transaction meta store but the transaction meta store handler not ready.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,10 @@ public CompletableFuture<Void> acknowledgeAsync(MessageId messageId,
if (null != txn) {
checkArgument(txn instanceof TransactionImpl);
txnImpl = (TransactionImpl) txn;
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
if (!txnImpl.checkIfOpen(completableFuture)) {
return completableFuture;
}
}
return doAcknowledgeWithTxn(messageId, AckType.Individual, Collections.emptyMap(), txnImpl);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -191,6 +192,10 @@ CompletableFuture<MessageId> internalSendAsync(Message<?> message) {

@Override
CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
if (txn != null && !((TransactionImpl)txn).checkIfOpen(completableFuture)) {
return completableFuture;
}
int partition = routerPolicy.choosePartition(message, topicMetadata);
checkArgument(partition >= 0 && partition < topicMetadata.numPartitions(),
"Illegal partition index chosen by the message routing policy: " + partition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transa
if (txn == null) {
return internalSendAsync(message);
} else {
CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
if (!((TransactionImpl)txn).checkIfOpen(completableFuture)) {
return completableFuture;
}
return ((TransactionImpl) txn).registerProducedTopic(topic)
.thenCompose(ignored -> internalSendAsync(message));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class PulsarClientImpl implements PulsarClient {
protected final ClientConfigurationData conf;
private LookupService lookup;
private final ConnectionPool cnxPool;
@Getter
private final Timer timer;
private boolean needStopTimer;
private final ExecutorProvider externalExecutorProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.impl.transaction;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -67,8 +69,11 @@ public CompletableFuture<Transaction> build() {
future.completeExceptionally(throwable);
return;
}
future.complete(new TransactionImpl(client, txnTimeout,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
TransactionImpl transaction = new TransactionImpl(client, txnTimeout,
txnID.getLeastSigBits(), txnID.getMostSigBits());
client.getTimer().newTimeout(transaction,
txnTimeout, timeUnit);
future.complete(transaction);
});
return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.apache.pulsar.client.impl.transaction;

import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.Lists;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -48,7 +51,7 @@
*/
@Slf4j
@Getter
public class TransactionImpl implements Transaction {
public class TransactionImpl implements Transaction , TimerTask {

private final PulsarClientImpl client;
private final long transactionTimeoutMs;
Expand All @@ -63,14 +66,22 @@ public class TransactionImpl implements Transaction {
private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
private final ArrayList<CompletableFuture<Void>> ackFutureList;
private volatile State state;
private final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state");

@Override
public void run(Timeout timeout) throws Exception {
STATE_UPDATE.compareAndSet(this, State.OPEN, State.TIMEOUT);
}

public enum State {
OPEN,
COMMITTING,
ABORTING,
COMMITTED,
ABORTED,
ERROR
ERROR,
TIMEOUT
}

TransactionImpl(PulsarClientImpl client,
Expand All @@ -93,7 +104,8 @@ public enum State {

// register the topics that will be modified by this transaction
public CompletableFuture<Void> registerProducedTopic(String topic) {
return checkIfOpen().thenCompose(value -> {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
if (checkIfOpen(completableFuture)) {
synchronized (TransactionImpl.this) {
// we need to issue the request to TC to register the produced topic
return registerPartitionMap.compute(topic, (key, future) -> {
Expand All @@ -106,7 +118,9 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
}
});
}
});
} else {
return completableFuture;
}
}

public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
Expand All @@ -115,7 +129,8 @@ public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture)

// register the topics that will be modified by this transaction
public CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
return checkIfOpen().thenCompose(value -> {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
if (checkIfOpen(completableFuture)) {
synchronized (TransactionImpl.this) {
// we need to issue the request to TC to register the acked topic
return registerSubscriptionMap.compute(Pair.of(topic, subscription), (key, future) -> {
Expand All @@ -128,7 +143,9 @@ public CompletableFuture<Void> registerAckedTopic(String topic, String subscript
}
});
}
});
} else {
return completableFuture;
}
}

public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
Expand Down Expand Up @@ -213,11 +230,14 @@ public TxnID getTxnID() {
return new TxnID(txnIdMostBits, txnIdLeastBits);
}

private CompletableFuture<Void> checkIfOpen() {
public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
if (state == State.OPEN) {
return CompletableFuture.completedFuture(null);
return true;
} else {
return invalidTxnStatusFuture();
completableFuture
.completeExceptionally(new InvalidTxnStatusException(
new TxnID(txnIdMostBits, txnIdLeastBits).toString(), state.name(), State.OPEN.name()));
return false;
}
}

Expand Down

0 comments on commit c5d7a84

Please sign in to comment.