Skip to content

Commit

Permalink
[improve][transaction] support configurable ``transactionBufferClient…
Browse files Browse the repository at this point in the history
…OperationTimeoutInMills`` (apache#15011)
  • Loading branch information
mattisonchao authored Apr 6, 2022
1 parent cdb67e4 commit 1c6ea12
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2489,6 +2489,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int transactionBufferClientMaxConcurrentRequests = 1000;

@FieldContext(
category = CATEGORY_TRANSACTION,
doc = "The transaction buffer client's operation timeout in milliseconds."
)
private long transactionBufferClientOperationTimeoutInMills = 3000L;

/**** --- KeyStore TLS config variables. --- ****/
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,8 @@ public void start() throws PulsarServerException {
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(getClient(), transactionTimer,
config.getTransactionBufferClientMaxConcurrentRequests());
config.getTransactionBufferClientMaxConcurrentRequests(),
config.getTransactionBufferClientOperationTimeoutInMills());

transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
}

public static TransactionBufferClient create(PulsarClient pulsarClient, HashedWheelTimer timer,
int maxConcurrentRequests) {
int maxConcurrentRequests, long operationTimeoutInMills) {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarClient, timer,
maxConcurrentRequests);
maxConcurrentRequests, operationTimeoutInMills);
return new TransactionBufferClientImpl(handler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ public CompletableFuture<ClientCnx> load(String topic) {
}
});

public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
HashedWheelTimer timer, int maxConcurrentRequests) {
public TransactionBufferHandlerImpl(PulsarClient pulsarClient, HashedWheelTimer timer,
int maxConcurrentRequests, long operationTimeoutInMills) {
this.pulsarClient = pulsarClient;
this.outstandingRequests = new ConcurrentSkipListMap<>();
this.pendingRequests = new GrowableArrayBlockingQueue<>();
this.operationTimeoutInMills = 3000L;
this.operationTimeoutInMills = operationTimeoutInMills;
this.timer = timer;
this.requestCredits = Math.max(100, maxConcurrentRequests);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void setup() throws Exception {
admin.namespaces().createNamespace(namespace, 10);
admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions);
tbClient = TransactionBufferClientImpl.create(pulsarClient,
new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000);
new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")), 1000, 3000);
}

@Override
Expand Down Expand Up @@ -160,7 +160,7 @@ public void testTransactionBufferClientTimeout() throws Exception {
@Cleanup("stop")
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
TransactionBufferHandlerImpl transactionBufferHandler =
new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000);
new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000);
CompletableFuture<TxnID> endFuture =
transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1);

Expand Down Expand Up @@ -203,7 +203,7 @@ public void testTransactionBufferChannelUnActive() {
@Cleanup("stop")
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
TransactionBufferHandlerImpl transactionBufferHandler =
new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000);
new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer, 1000, 3000);
try {
transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1).get();
fail();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class TransactionBufferHandlerImplTest {
public void testRequestCredits() {
PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
when(pulsarClient.getConnection(anyString())).thenReturn(CompletableFuture.completedFuture(mock(ClientCnx.class)));
TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(pulsarClient, null, 1000));
TransactionBufferHandlerImpl handler = spy(
new TransactionBufferHandlerImpl(pulsarClient, null, 1000, 3000));
doNothing().when(handler).endTxn(any());
for (int i = 0; i < 500; i++) {
handler.endTxnOnTopic("t", 1L, 1L, TxnAction.COMMIT, 1L);
Expand All @@ -61,7 +62,8 @@ public void testRequestCredits() {

@Test
public void testMinRequestCredits() {
TransactionBufferHandlerImpl handler = spy(new TransactionBufferHandlerImpl(null, null, 50));
TransactionBufferHandlerImpl handler = spy(
new TransactionBufferHandlerImpl(null, null, 50, 3000));
assertEquals(handler.getAvailableRequestCredits(), 100);
}
}

0 comments on commit 1c6ea12

Please sign in to comment.