diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java index c5e1e95e07abf..89da470137450 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java @@ -93,7 +93,7 @@ protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean for (int i = 0; i < partitionMetadata.partitions; i++) { try { transactionMetadataStoreInfoFutures - .add(pulsar().getAdminClient().transactions().getCoordinatorStatsById(i)); + .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i)); } catch (PulsarServerException e) { asyncResponse.resume(new RestException(e)); return; @@ -312,7 +312,7 @@ private void getTransactionMetadata(TxnMeta txnMeta, String topic = transactionSubscription.getTopic(); String subName = transactionSubscription.getSubscription(); CompletableFuture future = - transactions.getTransactionInPendingAckStats(txnID, topic, subName); + transactions.getTransactionInPendingAckStatsAsync(txnID, topic, subName); ackedPartitionsFutures.add(future); if (ackFutures.containsKey(topic)) { ackFutures.get(topic) @@ -329,7 +329,7 @@ private void getTransactionMetadata(TxnMeta txnMeta, Map> produceFutures = new HashMap<>(); txnMeta.producedPartitions().forEach(topic -> { CompletableFuture future = - transactions.getTransactionInBufferStats(txnID, topic); + transactions.getTransactionInBufferStatsAsync(txnID, topic); producedPartitionsFutures.add(future); produceFutures.put(topic, future); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 34ec6c0005cf0..a35557435bad4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -92,14 +92,14 @@ public void testGetTransactionCoordinatorStats() throws Exception { getTransaction().commit().get(); getTransaction().abort().get(); TransactionCoordinatorStats transactionCoordinatorstats = - admin.transactions().getCoordinatorStatsById(1).get(); + admin.transactions().getCoordinatorStatsByIdAsync(1).get(); verifyCoordinatorStats(transactionCoordinatorstats.state, transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark); - transactionCoordinatorstats = admin.transactions().getCoordinatorStatsById(0).get(); + transactionCoordinatorstats = admin.transactions().getCoordinatorStatsByIdAsync(0).get(); verifyCoordinatorStats(transactionCoordinatorstats.state, transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark); - Map stats = admin.transactions().getCoordinatorStats().get(); + Map stats = admin.transactions().getCoordinatorStatsAsync().get(); assertEquals(stats.size(), 2); @@ -121,7 +121,7 @@ public void testGetTransactionInBufferStats() throws Exception { Producer producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create(); MessageId messageId = producer.newMessage(transaction).value("Hello pulsar!".getBytes()).send(); TransactionInBufferStats transactionInBufferStats = admin.transactions() - .getTransactionInBufferStats(new TxnID(transaction.getTxnIdMostBits(), + .getTransactionInBufferStatsAsync(new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits()), topic).get(); PositionImpl position = PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId()); @@ -131,7 +131,7 @@ public void testGetTransactionInBufferStats() throws Exception { transaction.abort().get(); transactionInBufferStats = admin.transactions() - .getTransactionInBufferStats(new TxnID(transaction.getTxnIdMostBits(), + .getTransactionInBufferStatsAsync(new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits()), topic).get(); assertNull(transactionInBufferStats.startPosition); assertTrue(transactionInBufferStats.aborted); @@ -152,7 +152,7 @@ public void testGetTransactionPendingAckStats() throws Exception { producer.sendAsync("Hello pulsar!".getBytes()); TransactionImpl transaction = (TransactionImpl) getTransaction(); TransactionInPendingAckStats transactionInPendingAckStats = admin.transactions() - .getTransactionInPendingAckStats(new TxnID(transaction.getTxnIdMostBits(), + .getTransactionInPendingAckStatsAsync(new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits()), topic, subName).get(); assertNull(transactionInPendingAckStats.cumulativeAckPosition); @@ -163,7 +163,7 @@ public void testGetTransactionPendingAckStats() throws Exception { consumer.acknowledgeCumulativeAsync(batchMessageId, transaction).get(); transactionInPendingAckStats = admin.transactions() - .getTransactionInPendingAckStats(new TxnID(transaction.getTxnIdMostBits(), + .getTransactionInPendingAckStatsAsync(new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits()), topic, subName).get(); assertEquals(transactionInPendingAckStats.cumulativeAckPosition, @@ -212,7 +212,7 @@ public void testGetTransactionMetadata() throws Exception { consumer3.acknowledgeCumulativeAsync(messageId2, transaction).get(); TxnID txnID = new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits()); TransactionMetadata transactionMetadata = admin.transactions() - .getTransactionMetadata(new TxnID(transaction.getTxnIdMostBits(), + .getTransactionMetadataAsync(new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits())).get(); assertEquals(transactionMetadata.txnId, txnID.toString()); @@ -266,7 +266,7 @@ public void testGetTransactionBufferStats() throws Exception { consumer2.acknowledgeAsync(messageId, transaction).get(); TransactionBufferStats transactionBufferStats = admin.transactions(). - getTransactionBufferStats(topic).get(); + getTransactionBufferStatsAsync(topic).get(); assertEquals(transactionBufferStats.state, "Ready"); assertEquals(transactionBufferStats.maxReadPosition, @@ -286,7 +286,7 @@ public void testGetPendingAckStats() throws Exception { .subscriptionName(subName).subscribe(); TransactionPendingAckStats transactionPendingAckStats = admin.transactions(). - getPendingAckStats(topic, subName).get(); + getPendingAckStatsAsync(topic, subName).get(); assertEquals(transactionPendingAckStats.state, "Ready"); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java index 829494bc3e076..b082f1c178081 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java @@ -37,16 +37,31 @@ public interface Transactions { * Get transaction metadataStore stats. * * @param coordinatorId the id which get transaction coordinator - * @return the list future of transaction metadata store stats. + * @return the future of transaction metadata store stats. */ - CompletableFuture getCoordinatorStatsById(int coordinatorId); + CompletableFuture getCoordinatorStatsByIdAsync(int coordinatorId); + + /** + * Get transaction metadataStore stats. + * + * @param coordinatorId the id which get transaction coordinator + * @return the transaction metadata store stats. + */ + TransactionCoordinatorStats getCoordinatorStatsById(int coordinatorId) throws PulsarAdminException; /** * Get transaction metadataStore stats. * * @return the map future of transaction metadata store stats. */ - CompletableFuture> getCoordinatorStats(); + CompletableFuture> getCoordinatorStatsAsync(); + + /** + * Get transaction metadataStore stats. + * + * @return the map of transaction metadata store stats. + */ + Map getCoordinatorStats() throws PulsarAdminException; /** * Get transaction in buffer stats. @@ -55,7 +70,16 @@ public interface Transactions { * @param topic the produce topic * @return the future stats of transaction in buffer. */ - CompletableFuture getTransactionInBufferStats(TxnID txnID, String topic); + CompletableFuture getTransactionInBufferStatsAsync(TxnID txnID, String topic); + + /** + * Get transaction in buffer stats. + * + * @param txnID the txnId + * @param topic the produce topic + * @return the stats of transaction in buffer. + */ + TransactionInBufferStats getTransactionInBufferStats(TxnID txnID, String topic) throws PulsarAdminException; /** * Get transaction in pending ack stats. @@ -65,16 +89,33 @@ public interface Transactions { * @param subName the subscription name of this transaction ack * @return the future stats of transaction in pending ack. */ - CompletableFuture getTransactionInPendingAckStats(TxnID txnID, String topic, - String subName); - + CompletableFuture getTransactionInPendingAckStatsAsync(TxnID txnID, String topic, + String subName); + /** + * Get transaction in pending ack stats. + * + * @param txnID the txnId + * @param topic the ack topic + * @param subName the subscription name of this transaction ack + * @return the stats of transaction in pending ack. + */ + TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String topic, + String subName) throws PulsarAdminException; /** * Get transaction metadata. * * @param txnID the ID of this transaction * @return the future metadata of this transaction. */ - CompletableFuture getTransactionMetadata(TxnID txnID); + CompletableFuture getTransactionMetadataAsync(TxnID txnID); + + /** + * Get transaction metadata. + * + * @param txnID the ID of this transaction + * @return the metadata of this transaction. + */ + TransactionMetadata getTransactionMetadata(TxnID txnID) throws PulsarAdminException; /** * Get transaction buffer stats. @@ -82,16 +123,33 @@ CompletableFuture getTransactionInPendingAckStats( * @param topic the topic of getting transaction buffer stats * @return the future stats of transaction buffer in topic. */ - CompletableFuture getTransactionBufferStats(String topic); + CompletableFuture getTransactionBufferStatsAsync(String topic); + + /** + * Get transaction buffer stats. + * + * @param topic the topic of getting transaction buffer stats + * @return the stats of transaction buffer in topic. + */ + TransactionBufferStats getTransactionBufferStats(String topic) throws PulsarAdminException; + + /** + * Get transaction pending ack stats. + * + * @param topic the topic of this transaction pending ack stats + * @param subName the subscription name of this transaction pending ack stats + * @return the stats of transaction pending ack. + */ + CompletableFuture getPendingAckStatsAsync(String topic, String subName); /** * Get transaction pending ack stats. * * @param topic the topic of this transaction pending ack stats * @param subName the subscription name of this transaction pending ack stats - * @return the future stats of transaction pending ack. + * @return the stats of transaction pending ack. */ - CompletableFuture getPendingAckStats(String topic, String subName); + TransactionPendingAckStats getPendingAckStats(String topic, String subName) throws PulsarAdminException; /** * Get slow transactions by coordinator id. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java index 30d41d9f624f4..984c17b2e67fb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java @@ -48,7 +48,7 @@ public TransactionsImpl(WebTarget web, Authentication auth, long readTimeoutMs) } @Override - public CompletableFuture getCoordinatorStatsById(int coordinatorId) { + public CompletableFuture getCoordinatorStatsByIdAsync(int coordinatorId) { WebTarget path = adminV3Transactions.path("coordinatorStats"); path = path.queryParam("coordinatorId", coordinatorId); final CompletableFuture future = new CompletableFuture<>(); @@ -68,7 +68,22 @@ public void failed(Throwable throwable) { } @Override - public CompletableFuture> getCoordinatorStats() { + public TransactionCoordinatorStats getCoordinatorStatsById(int coordinatorId) throws PulsarAdminException { + try { + return getCoordinatorStatsByIdAsync(coordinatorId) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture> getCoordinatorStatsAsync() { WebTarget path = adminV3Transactions.path("coordinatorStats"); final CompletableFuture> future = new CompletableFuture<>(); asyncGetRequest(path, @@ -87,7 +102,21 @@ public void failed(Throwable throwable) { } @Override - public CompletableFuture getTransactionInBufferStats(TxnID txnID, String topic) { + public Map getCoordinatorStats() throws PulsarAdminException { + try { + return getCoordinatorStatsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getTransactionInBufferStatsAsync(TxnID txnID, String topic) { WebTarget path = adminV3Transactions.path("transactionInBufferStats"); path = path.queryParam("mostSigBits", txnID.getMostSigBits()); path = path.queryParam("leastSigBits", txnID.getLeastSigBits()); @@ -109,8 +138,23 @@ public void failed(Throwable throwable) { } @Override - public CompletableFuture getTransactionInPendingAckStats(TxnID txnID, String topic, - String subName) { + public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID, String topic) throws PulsarAdminException { + try { + return getTransactionInBufferStatsAsync(txnID, topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getTransactionInPendingAckStatsAsync(TxnID txnID, + String topic, + String subName) { WebTarget path = adminV3Transactions.path("transactionInPendingAckStats"); path = path.queryParam("mostSigBits", txnID.getMostSigBits()); path = path.queryParam("leastSigBits", txnID.getLeastSigBits()); @@ -133,7 +177,23 @@ public void failed(Throwable throwable) { } @Override - public CompletableFuture getTransactionMetadata(TxnID txnID) { + public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String topic, + String subName) throws PulsarAdminException { + try { + return getTransactionInPendingAckStatsAsync(txnID, topic, subName) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getTransactionMetadataAsync(TxnID txnID) { WebTarget path = adminV3Transactions.path("transactionMetadata"); path = path.queryParam("mostSigBits", txnID.getMostSigBits()); path = path.queryParam("leastSigBits", txnID.getLeastSigBits()); @@ -154,7 +214,22 @@ public void failed(Throwable throwable) { } @Override - public CompletableFuture getTransactionBufferStats(String topic) { + public TransactionMetadata getTransactionMetadata(TxnID txnID) throws PulsarAdminException { + try { + return getTransactionMetadataAsync(txnID) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getTransactionBufferStatsAsync(String topic) { WebTarget path = adminV3Transactions.path("transactionBufferStats"); path = path.queryParam("topic", topic); final CompletableFuture future = new CompletableFuture<>(); @@ -174,7 +249,21 @@ public void failed(Throwable throwable) { } @Override - public CompletableFuture getPendingAckStats(String topic, String subName) { + public TransactionBufferStats getTransactionBufferStats(String topic) throws PulsarAdminException { + try { + return getTransactionBufferStatsAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + + @Override + public CompletableFuture getPendingAckStatsAsync(String topic, String subName) { WebTarget path = adminV3Transactions.path("pendingAckStats"); path = path.queryParam("topic", topic); path = path.queryParam("subName", subName); @@ -194,6 +283,20 @@ public void failed(Throwable throwable) { return future; } + @Override + public TransactionPendingAckStats getPendingAckStats(String topic, String subName) throws PulsarAdminException { + try { + return getPendingAckStatsAsync(topic, subName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + @Override public CompletableFuture> getSlowTransactionsByCoordinatorIdAsync( Integer coordinatorId, long timeout, TimeUnit timeUnit) {