Skip to content

Commit

Permalink
[Transaction] Transaction admin api add synchronize method. (apache#1…
Browse files Browse the repository at this point in the history
…0745)

## Motivation
Transaction admin api add synchronize method

## implement
add synchronize method
  • Loading branch information
congbobo184 authored May 31, 2021
1 parent 5cdb421 commit eeaa695
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
transactionMetadataStoreInfoFutures
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsById(i));
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
Expand Down Expand Up @@ -312,7 +312,7 @@ private void getTransactionMetadata(TxnMeta txnMeta,
String topic = transactionSubscription.getTopic();
String subName = transactionSubscription.getSubscription();
CompletableFuture<TransactionInPendingAckStats> future =
transactions.getTransactionInPendingAckStats(txnID, topic, subName);
transactions.getTransactionInPendingAckStatsAsync(txnID, topic, subName);
ackedPartitionsFutures.add(future);
if (ackFutures.containsKey(topic)) {
ackFutures.get(topic)
Expand All @@ -329,7 +329,7 @@ private void getTransactionMetadata(TxnMeta txnMeta,
Map<String, CompletableFuture<TransactionInBufferStats>> produceFutures = new HashMap<>();
txnMeta.producedPartitions().forEach(topic -> {
CompletableFuture<TransactionInBufferStats> future =
transactions.getTransactionInBufferStats(txnID, topic);
transactions.getTransactionInBufferStatsAsync(txnID, topic);
producedPartitionsFutures.add(future);
produceFutures.put(topic, future);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ public void testGetTransactionCoordinatorStats() throws Exception {
getTransaction().commit().get();
getTransaction().abort().get();
TransactionCoordinatorStats transactionCoordinatorstats =
admin.transactions().getCoordinatorStatsById(1).get();
admin.transactions().getCoordinatorStatsByIdAsync(1).get();
verifyCoordinatorStats(transactionCoordinatorstats.state,
transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark);

transactionCoordinatorstats = admin.transactions().getCoordinatorStatsById(0).get();
transactionCoordinatorstats = admin.transactions().getCoordinatorStatsByIdAsync(0).get();
verifyCoordinatorStats(transactionCoordinatorstats.state,
transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark);
Map<Integer, TransactionCoordinatorStats> stats = admin.transactions().getCoordinatorStats().get();
Map<Integer, TransactionCoordinatorStats> stats = admin.transactions().getCoordinatorStatsAsync().get();

assertEquals(stats.size(), 2);

Expand All @@ -121,7 +121,7 @@ public void testGetTransactionInBufferStats() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
MessageId messageId = producer.newMessage(transaction).value("Hello pulsar!".getBytes()).send();
TransactionInBufferStats transactionInBufferStats = admin.transactions()
.getTransactionInBufferStats(new TxnID(transaction.getTxnIdMostBits(),
.getTransactionInBufferStatsAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic).get();
PositionImpl position =
PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId());
Expand All @@ -131,7 +131,7 @@ public void testGetTransactionInBufferStats() throws Exception {
transaction.abort().get();

transactionInBufferStats = admin.transactions()
.getTransactionInBufferStats(new TxnID(transaction.getTxnIdMostBits(),
.getTransactionInBufferStatsAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic).get();
assertNull(transactionInBufferStats.startPosition);
assertTrue(transactionInBufferStats.aborted);
Expand All @@ -152,7 +152,7 @@ public void testGetTransactionPendingAckStats() throws Exception {
producer.sendAsync("Hello pulsar!".getBytes());
TransactionImpl transaction = (TransactionImpl) getTransaction();
TransactionInPendingAckStats transactionInPendingAckStats = admin.transactions()
.getTransactionInPendingAckStats(new TxnID(transaction.getTxnIdMostBits(),
.getTransactionInPendingAckStatsAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic, subName).get();
assertNull(transactionInPendingAckStats.cumulativeAckPosition);

Expand All @@ -163,7 +163,7 @@ public void testGetTransactionPendingAckStats() throws Exception {
consumer.acknowledgeCumulativeAsync(batchMessageId, transaction).get();

transactionInPendingAckStats = admin.transactions()
.getTransactionInPendingAckStats(new TxnID(transaction.getTxnIdMostBits(),
.getTransactionInPendingAckStatsAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic, subName).get();

assertEquals(transactionInPendingAckStats.cumulativeAckPosition,
Expand Down Expand Up @@ -212,7 +212,7 @@ public void testGetTransactionMetadata() throws Exception {
consumer3.acknowledgeCumulativeAsync(messageId2, transaction).get();
TxnID txnID = new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits());
TransactionMetadata transactionMetadata = admin.transactions()
.getTransactionMetadata(new TxnID(transaction.getTxnIdMostBits(),
.getTransactionMetadataAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits())).get();

assertEquals(transactionMetadata.txnId, txnID.toString());
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testGetTransactionBufferStats() throws Exception {
consumer2.acknowledgeAsync(messageId, transaction).get();

TransactionBufferStats transactionBufferStats = admin.transactions().
getTransactionBufferStats(topic).get();
getTransactionBufferStatsAsync(topic).get();

assertEquals(transactionBufferStats.state, "Ready");
assertEquals(transactionBufferStats.maxReadPosition,
Expand All @@ -286,7 +286,7 @@ public void testGetPendingAckStats() throws Exception {
.subscriptionName(subName).subscribe();

TransactionPendingAckStats transactionPendingAckStats = admin.transactions().
getPendingAckStats(topic, subName).get();
getPendingAckStatsAsync(topic, subName).get();

assertEquals(transactionPendingAckStats.state, "Ready");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,31 @@ public interface Transactions {
* Get transaction metadataStore stats.
*
* @param coordinatorId the id which get transaction coordinator
* @return the list future of transaction metadata store stats.
* @return the future of transaction metadata store stats.
*/
CompletableFuture<TransactionCoordinatorStats> getCoordinatorStatsById(int coordinatorId);
CompletableFuture<TransactionCoordinatorStats> getCoordinatorStatsByIdAsync(int coordinatorId);

/**
* Get transaction metadataStore stats.
*
* @param coordinatorId the id which get transaction coordinator
* @return the transaction metadata store stats.
*/
TransactionCoordinatorStats getCoordinatorStatsById(int coordinatorId) throws PulsarAdminException;

/**
* Get transaction metadataStore stats.
*
* @return the map future of transaction metadata store stats.
*/
CompletableFuture<Map<Integer, TransactionCoordinatorStats>> getCoordinatorStats();
CompletableFuture<Map<Integer, TransactionCoordinatorStats>> getCoordinatorStatsAsync();

/**
* Get transaction metadataStore stats.
*
* @return the map of transaction metadata store stats.
*/
Map<Integer, TransactionCoordinatorStats> getCoordinatorStats() throws PulsarAdminException;

/**
* Get transaction in buffer stats.
Expand All @@ -55,7 +70,16 @@ public interface Transactions {
* @param topic the produce topic
* @return the future stats of transaction in buffer.
*/
CompletableFuture<TransactionInBufferStats> getTransactionInBufferStats(TxnID txnID, String topic);
CompletableFuture<TransactionInBufferStats> getTransactionInBufferStatsAsync(TxnID txnID, String topic);

/**
* Get transaction in buffer stats.
*
* @param txnID the txnId
* @param topic the produce topic
* @return the stats of transaction in buffer.
*/
TransactionInBufferStats getTransactionInBufferStats(TxnID txnID, String topic) throws PulsarAdminException;

/**
* Get transaction in pending ack stats.
Expand All @@ -65,33 +89,67 @@ public interface Transactions {
* @param subName the subscription name of this transaction ack
* @return the future stats of transaction in pending ack.
*/
CompletableFuture<TransactionInPendingAckStats> getTransactionInPendingAckStats(TxnID txnID, String topic,
String subName);

CompletableFuture<TransactionInPendingAckStats> getTransactionInPendingAckStatsAsync(TxnID txnID, String topic,
String subName);
/**
* Get transaction in pending ack stats.
*
* @param txnID the txnId
* @param topic the ack topic
* @param subName the subscription name of this transaction ack
* @return the stats of transaction in pending ack.
*/
TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String topic,
String subName) throws PulsarAdminException;
/**
* Get transaction metadata.
*
* @param txnID the ID of this transaction
* @return the future metadata of this transaction.
*/
CompletableFuture<TransactionMetadata> getTransactionMetadata(TxnID txnID);
CompletableFuture<TransactionMetadata> getTransactionMetadataAsync(TxnID txnID);

/**
* Get transaction metadata.
*
* @param txnID the ID of this transaction
* @return the metadata of this transaction.
*/
TransactionMetadata getTransactionMetadata(TxnID txnID) throws PulsarAdminException;

/**
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @return the future stats of transaction buffer in topic.
*/
CompletableFuture<TransactionBufferStats> getTransactionBufferStats(String topic);
CompletableFuture<TransactionBufferStats> getTransactionBufferStatsAsync(String topic);

/**
* Get transaction buffer stats.
*
* @param topic the topic of getting transaction buffer stats
* @return the stats of transaction buffer in topic.
*/
TransactionBufferStats getTransactionBufferStats(String topic) throws PulsarAdminException;

/**
* Get transaction pending ack stats.
*
* @param topic the topic of this transaction pending ack stats
* @param subName the subscription name of this transaction pending ack stats
* @return the stats of transaction pending ack.
*/
CompletableFuture<TransactionPendingAckStats> getPendingAckStatsAsync(String topic, String subName);

/**
* Get transaction pending ack stats.
*
* @param topic the topic of this transaction pending ack stats
* @param subName the subscription name of this transaction pending ack stats
* @return the future stats of transaction pending ack.
* @return the stats of transaction pending ack.
*/
CompletableFuture<TransactionPendingAckStats> getPendingAckStats(String topic, String subName);
TransactionPendingAckStats getPendingAckStats(String topic, String subName) throws PulsarAdminException;

/**
* Get slow transactions by coordinator id.
Expand Down
Loading

0 comments on commit eeaa695

Please sign in to comment.