Skip to content

Commit

Permalink
[fix][txn] optimize the ack/send future in TransactionImpl (apache#17777
Browse files Browse the repository at this point in the history
)

### Motivation
The TransactionImpl stores a lot of future. This uses a lot of memory and can be optimized to two futures.
### Modification
Optimize the future list to a single future.
  • Loading branch information
liangyepianzhou authored Oct 12, 2022
1 parent 7ac8e3e commit 92b4708
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -319,6 +321,86 @@ public Consumer<byte[]> getConsumer(String topicName, String subName) throws Pul
.subscribe();
}

@Test
public void testAsyncSendOrAckForSingleFuture() throws Exception {
String topic = NAMESPACE1 + "/testSingleFuture";
int totalMessage = 10;
int threadSize = 30;
String topicName = "subscription";
getPulsarServiceList().get(0).getConfig().setBrokerDeduplicationEnabled(false);
ExecutorService executorService = Executors.newFixedThreadPool(threadSize);

//build producer/consumer
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.producerName("producer")
.sendTimeout(0, TimeUnit.SECONDS)
.create();

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName(topicName)
.subscribe();
//store the send/ack result futures
CopyOnWriteArrayList<CompletableFuture<MessageId>> sendFutures = new CopyOnWriteArrayList<>();
CopyOnWriteArrayList<CompletableFuture<Void>> ackFutures = new CopyOnWriteArrayList<>();

//send and ack messages with transaction
Transaction transaction1 = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();

for (int i = 0; i < totalMessage * threadSize; i++) {
producer.newMessage().send();
}

CountDownLatch countDownLatch = new CountDownLatch(threadSize);
for (int i = 0; i < threadSize; i++) {
executorService.submit(() -> {
try {
for (int j = 0; j < totalMessage; j++) {
CompletableFuture<MessageId> sendFuture = producer.newMessage(transaction1).sendAsync();
sendFutures.add(sendFuture);
Message<byte[]> message = consumer.receive();
CompletableFuture<Void> ackFuture = consumer.acknowledgeAsync(message.getMessageId(),
transaction1);
ackFutures.add(ackFuture);
}
countDownLatch.countDown();
} catch (Exception e) {
log.error("Failed to send/ack messages with transaction.", e);
countDownLatch.countDown();
}
});
}
//wait the all send/ack op is executed and store its futures in the arraylist.
countDownLatch.await(10, TimeUnit.SECONDS);
transaction1.commit().get();

//verify the final status is right.
Field ackCountField = TransactionImpl.class.getDeclaredField("opCount");
ackCountField.setAccessible(true);
long ackCount = (long) ackCountField.get(transaction1);
Assert.assertEquals(ackCount, 0L);

for (int i = 0; i < totalMessage * threadSize; i++) {
Assert.assertTrue(sendFutures.get(i).isDone());
Assert.assertTrue(ackFutures.get(i).isDone());
}

//verify opFuture without any operation.
Transaction transaction2 = pulsarClient.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();
Awaitility.await().until(() -> {
transaction2.commit().get();
return true;
});
}

@Test
public void testGetTxnID() throws Exception {
Transaction transaction = pulsarClient.newTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,23 @@ public TransactionConflictException(String msg) {
}
}

public static class TransactionHasOperationFailedException extends PulsarClientException {
/**
* Constructs an {@code TransactionHasOperationFailedException}.
*/
public TransactionHasOperationFailedException() {
super("Now allowed to commit the transaction due to failed operations of producing or acknowledgment");
}

/**
* Constructs an {@code TransactionHasOperationFailedException} with the specified detail message.
* @param msg The detail message.
*/
public TransactionHasOperationFailedException(String msg) {
super(msg);
}
}

// wrap an exception to enriching more info messages.
public static Throwable wrap(Throwable t, String msg) {
msg += "\n" + t.getMessage();
Expand Down Expand Up @@ -972,6 +989,8 @@ public static Throwable wrap(Throwable t, String msg) {
return new MessageAcknowledgeException(msg);
} else if (t instanceof TransactionConflictException) {
return new TransactionConflictException(msg);
} else if (t instanceof TransactionHasOperationFailedException) {
return new TransactionHasOperationFailedException(msg);
} else if (t instanceof PulsarClientException) {
return new PulsarClientException(msg);
} else if (t instanceof CompletionException) {
Expand Down Expand Up @@ -1070,6 +1089,8 @@ public static PulsarClientException unwrap(Throwable t) {
newException = new MemoryBufferIsFullError(msg);
} else if (cause instanceof NotFoundException) {
newException = new NotFoundException(msg);
} else if (cause instanceof TransactionHasOperationFailedException) {
newException = new TransactionHasOperationFailedException(msg);
} else {
newException = new PulsarClientException(t);
}
Expand Down Expand Up @@ -1133,7 +1154,8 @@ public static boolean isRetriableError(Throwable t) {
|| t instanceof MessageAcknowledgeException
|| t instanceof TransactionConflictException
|| t instanceof ProducerBusyException
|| t instanceof ConsumerBusyException) {
|| t instanceof ConsumerBusyException
|| t instanceof TransactionHasOperationFailedException) {
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
Expand Down Expand Up @@ -61,11 +61,18 @@ public class TransactionImpl implements Transaction , TimerTask {
private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
private final TransactionCoordinatorClientImpl tcClient;

private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
private final ArrayList<CompletableFuture<Void>> ackFutureList;
private CompletableFuture<Void> opFuture;

private volatile long opCount = 0L;
private static final AtomicLongFieldUpdater<TransactionImpl> OP_COUNT_UPDATE =
AtomicLongFieldUpdater.newUpdater(TransactionImpl.class, "opCount");


private volatile State state;
private static final AtomicReferenceFieldUpdater<TransactionImpl, State> STATE_UPDATE =
AtomicReferenceFieldUpdater.newUpdater(TransactionImpl.class, State.class, "state");

private volatile boolean hasOpsFailed = false;
private final Timeout timeout;

@Override
Expand All @@ -86,9 +93,7 @@ public void run(Timeout timeout) throws Exception {
this.registerPartitionMap = new ConcurrentHashMap<>();
this.registerSubscriptionMap = new ConcurrentHashMap<>();
this.tcClient = client.getTcClient();

this.sendFutureList = new ArrayList<>();
this.ackFutureList = new ArrayList<>();
this.opFuture = CompletableFuture.completedFuture(null);
this.timeout = client.getTimer().newTimeout(this, transactionTimeoutMs, TimeUnit.MILLISECONDS);

}
Expand All @@ -113,8 +118,25 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
return completableFuture;
}

public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
sendFutureList.add(sendFuture);
public void registerSendOp(CompletableFuture<MessageId> newSendFuture) {
if (OP_COUNT_UPDATE.getAndIncrement(this) == 0) {
opFuture = new CompletableFuture<>();
}
// the opCount is always bigger than 0 if there is an exception,
// and then the opFuture will never be replaced.
newSendFuture.whenComplete((messageId, e) -> {
if (e != null) {
log.error("The transaction [{}:{}] get an exception when send messages.",
txnIdMostBits, txnIdLeastBits, e);
if (!hasOpsFailed) {
hasOpsFailed = true;
}
}
CompletableFuture<Void> future = opFuture;
if (OP_COUNT_UPDATE.decrementAndGet(this) == 0) {
future.complete(null);
}
});
}

// register the topics that will be modified by this transaction
Expand All @@ -137,8 +159,25 @@ public CompletableFuture<Void> registerAckedTopic(String topic, String subscript
return completableFuture;
}

public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
ackFutureList.add(ackFuture);
public void registerAckOp(CompletableFuture<Void> newAckFuture) {
if (OP_COUNT_UPDATE.getAndIncrement(this) == 0) {
opFuture = new CompletableFuture<>();
}
// the opCount is always bigger than 0 if there is an exception,
// and then the opFuture will never be replaced.
newAckFuture.whenComplete((ignore, e) -> {
if (e != null) {
log.error("The transaction [{}:{}] get an exception when ack messages.",
txnIdMostBits, txnIdLeastBits, e);
if (!hasOpsFailed) {
hasOpsFailed = true;
}
}
CompletableFuture<Void> future = opFuture;
if (OP_COUNT_UPDATE.decrementAndGet(this) == 0) {
future.complete(null);
}
});
}

@Override
Expand All @@ -147,9 +186,10 @@ public CompletableFuture<Void> commit() {
return checkIfOpenOrCommitting().thenCompose((value) -> {
CompletableFuture<Void> commitFuture = new CompletableFuture<>();
this.state = State.COMMITTING;
allOpComplete().whenComplete((v, e) -> {
if (e != null) {
abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(e));
opFuture.whenComplete((v, e) -> {
if (hasOpsFailed) {
abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(new PulsarClientException
.TransactionHasOperationFailedException()));
} else {
tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits))
.whenComplete((vx, ex) -> {
Expand All @@ -176,10 +216,7 @@ public CompletableFuture<Void> abort() {
return checkIfOpenOrAborting().thenCompose(value -> {
CompletableFuture<Void> abortFuture = new CompletableFuture<>();
this.state = State.ABORTING;
allOpComplete().whenComplete((v, e) -> {
if (e != null) {
log.error(e.getMessage());
}
opFuture.whenComplete((v, e) -> {
tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> {

if (ex != null) {
Expand Down Expand Up @@ -242,12 +279,4 @@ private CompletableFuture<Void> invalidTxnStatusFuture() {
+ txnIdLeastBits + "] with unexpected state : "
+ state.name() + ", expect " + State.OPEN + " state!"));
}


private CompletableFuture<Void> allOpComplete() {
List<CompletableFuture<?>> futureList = new ArrayList<>();
futureList.addAll(sendFutureList);
futureList.addAll(ackFutureList);
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
}
}

0 comments on commit 92b4708

Please sign in to comment.