From 2fce656dcf68b66ee25452d4bb1501fa44af633f Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Fri, 28 May 2021 12:24:50 +0800 Subject: [PATCH] [Transaction] Transaction admin api get pending ack internal stats (#10725) ## Motivation Transaction add admin api `getPendingAckInternalStats` ## implement ``` /** * Transaction pending ack internal stats. */ public class TransactionPendingAckInternalStats { /** The manage ledger internal stats*/ public ManagedLedgerInternalStats managedLedgerInternalStats; } ``` --- .../broker/admin/impl/TransactionsBase.java | 69 +++++++++++++++++++ .../pulsar/broker/admin/v3/Transactions.java | 27 ++++++++ .../persistent/PersistentSubscription.java | 9 +++ .../service/persistent/PersistentTopic.java | 10 +++ .../pendingack/impl/MLPendingAckStore.java | 6 +- .../pendingack/impl/PendingAckHandleImpl.java | 16 +++++ .../admin/v3/AdminApiTransactionTest.java | 42 +++++++++++ .../pulsar/client/admin/Transactions.java | 25 +++++++ .../admin/internal/TransactionsImpl.java | 44 ++++++++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 4 ++ .../pulsar/admin/cli/CmdTransactions.java | 17 +++++ .../TransactionCoordinatorInternalStats.java | 2 +- .../TransactionPendingAckInternalStats.java | 28 ++++++++ 13 files changed, 297 insertions(+), 2 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckInternalStats.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java index 980096c58a845..c5e1e95e07abf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java @@ -38,6 +38,9 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.admin.AdminResource; +import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; +import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; +import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; @@ -50,6 +53,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionLogStats; import org.apache.pulsar.common.policies.data.TransactionMetadata; +import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; @@ -504,4 +508,69 @@ protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, asyncResponse.resume(new RestException(e.getCause())); } } + + protected void internalGetPendingAckInternalStats(AsyncResponse asyncResponse, boolean authoritative, + TopicName topicName, String subName, boolean metadata) { + try { + if (pulsar().getConfig().isTransactionCoordinatorEnabled()) { + validateTopicOwnership(topicName, authoritative); + CompletableFuture> topicFuture = pulsar().getBrokerService() + .getTopics().get(topicName.toString()); + if (topicFuture != null) { + topicFuture.whenComplete((optionalTopic, e) -> { + + if (e != null) { + asyncResponse.resume(new RestException(e)); + return; + } + if (!optionalTopic.isPresent()) { + asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, + "Topic is not owned by this broker!")); + return; + } + Topic topicObject = optionalTopic.get(); + if (topicObject instanceof PersistentTopic) { + try { + ManagedLedger managedLedger = + ((PersistentTopic) topicObject).getPendingAckManagedLedger(subName).get(); + TransactionPendingAckInternalStats stats = + new TransactionPendingAckInternalStats(); + TransactionLogStats pendingAckLogStats = new TransactionLogStats(); + pendingAckLogStats.managedLedgerName = managedLedger.getName(); + pendingAckLogStats.managedLedgerInternalStats = + managedLedger.getManagedLedgerInternalStats(metadata).get(); + stats.pendingAckLogStats = pendingAckLogStats; + asyncResponse.resume(stats); + } catch (Exception exception) { + if (exception instanceof ExecutionException) { + if (exception.getCause() instanceof ServiceUnitNotReadyException) { + asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, + exception.getCause())); + return; + } else if (exception.getCause() instanceof NotAllowedException) { + asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, + exception.getCause())); + return; + } else if (exception.getCause() instanceof SubscriptionNotFoundException) { + asyncResponse.resume(new RestException(NOT_FOUND, exception.getCause())); + return; + } + } + asyncResponse.resume(new RestException(exception)); + } + } else { + asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!")); + } + }); + } else { + asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!")); + } + } else { + asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, + "This Broker is not configured with transactionCoordinatorEnabled=true.")); + } + } catch (Exception e) { + asyncResponse.resume(new RestException(e.getCause())); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java index ef23682905e3a..1184cc91418b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java @@ -25,6 +25,7 @@ import io.swagger.annotations.ApiResponses; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; +import javax.ws.rs.Encoded; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -34,6 +35,8 @@ import javax.ws.rs.container.Suspended; import javax.ws.rs.core.MediaType; import org.apache.pulsar.broker.admin.impl.TransactionsBase; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; @Path("/transactions") @Produces(MediaType.APPLICATION_JSON) @@ -200,4 +203,28 @@ public void getCoordinatorInternalStats(@Suspended final AsyncResponse asyncResp @QueryParam("metadata") @DefaultValue("false") boolean metadata) { internalGetCoordinatorInternalStats(asyncResponse, authoritative, metadata, Integer.parseInt(coordinatorId)); } + + @GET + @Path("/pendingAckInternalStats/{tenant}/{namespace}/{topic}/{subName}") + @ApiOperation(value = "Get transaction pending ack internal stats.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic " + + "or subscription name doesn't exist"), + @ApiResponse(code = 503, message = "This Broker is not configured " + + "with transactionCoordinatorEnabled=true."), + @ApiResponse(code = 307, message = "Topic is not owned by this broker!"), + @ApiResponse(code = 405, message = "Pending ack handle don't use managedLedger!"), + @ApiResponse(code = 400, message = "Topic is not a persistent topic!"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void getPendingAckInternalStats(@Suspended final AsyncResponse asyncResponse, + @QueryParam("authoritative") + @DefaultValue("false") boolean authoritative, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @PathParam("subName") String subName, + @QueryParam("metadata") @DefaultValue("false") boolean metadata) { + internalGetPendingAckInternalStats(asyncResponse, authoritative, + TopicName.get(TopicDomain.persistent.value(), tenant, namespace, encodedTopic), subName, metadata); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 25898780d94da..99f28201fd0af 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; +import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; @@ -1155,5 +1156,13 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) return this.pendingAckHandle.getTransactionInPendingAckStats(txnID); } + public CompletableFuture getPendingAckManageLedger() { + if (this.pendingAckHandle instanceof PendingAckHandleImpl) { + return ((PendingAckHandleImpl) this.pendingAckHandle).getStoreManageLedger(); + } else { + return FutureUtil.failedFuture(new NotAllowedException("Pending ack handle don't use managedLedger!")); + } + } + private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5d4b2bca56466..ae6aa4ae0f039 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -83,6 +83,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; +import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException; @@ -3087,4 +3088,13 @@ protected boolean isTerminated() { public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) { return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID); } + + public CompletableFuture getPendingAckManagedLedger(String subName) { + PersistentSubscription subscription = subscriptions.get(subName); + if (subscription == null) { + return FutureUtil.failedFuture(new SubscriptionNotFoundException((topic + + " not found subscription : " + subName))); + } + return subscription.getPendingAckManageLedger(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index 14aaa5feb3e26..b60f4ba21dc29 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -64,7 +64,7 @@ public class MLPendingAckStore implements PendingAckStore { public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; - private static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state"; + public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state"; private final SpscArrayQueue entryQueue; @@ -384,6 +384,10 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } + public CompletableFuture getManagedLedger() { + return CompletableFuture.completedFuture(this.managedLedger); + } + public static String getTransactionPendingAckStoreSuffix(String originTopicName, String subName) { return TopicName.get(originTopicName) + "-" + subName + PENDING_ACK_STORE_SUFFIX; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index b703c63c2a911..bf6e5748883d2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -733,4 +734,19 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) public CompletableFuture close() { return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync); } + + public CompletableFuture getStoreManageLedger() { + if (this.pendingAckStoreFuture.isDone()) { + return this.pendingAckStoreFuture.thenCompose(pendingAckStore -> { + if (pendingAckStore instanceof MLPendingAckStore) { + return ((MLPendingAckStore) pendingAckStore).getManagedLedger(); + } else { + return FutureUtil.failedFuture( + new NotAllowedException("Pending ack handle don't use managedLedger!")); + } + }); + } else { + return FutureUtil.failedFuture(new ServiceUnitNotReadyException("Pending ack have not init success!")); + } + } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index d48810867a775..34ec6c0005cf0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.Sets; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -41,6 +42,7 @@ import org.apache.pulsar.common.policies.data.TransactionBufferStats; import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; +import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; @@ -343,6 +345,46 @@ public void testGetCoordinatorInternalStats() throws Exception { stats.transactionLogStats.managedLedgerName); } + @Test(timeOut = 20000) + public void testGetPendingAckInternalStats() throws Exception { + initTransaction(1); + TransactionImpl transaction = (TransactionImpl) getTransaction(); + final String topic = "persistent://public/default/testGetPendingAckInternalStats"; + final String subName = "test"; + admin.topics().createNonPartitionedTopic(topic); + Producer producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create(); + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic) + .subscriptionName(subName).subscribe(); + MessageId messageId = producer.send("Hello pulsar!".getBytes()); + consumer.acknowledgeAsync(messageId, transaction).get(); + + TransactionPendingAckInternalStats stats = admin.transactions() + .getPendingAckInternalStatsAsync(topic, subName, true).get(); + ManagedLedgerInternalStats managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats; + assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default", + "testGetPendingAckInternalStats" + "-" + + subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(), + stats.pendingAckLogStats.managedLedgerName); + + verifyManagedLegerInternalStats(managedLedgerInternalStats, 16); + + ManagedLedgerInternalStats finalManagedLedgerInternalStats = managedLedgerInternalStats; + managedLedgerInternalStats.cursors.forEach((s, cursorStats) -> { + assertEquals(s, MLPendingAckStore.PENDING_ACK_STORE_CURSOR_NAME); + assertEquals(cursorStats.readPosition, finalManagedLedgerInternalStats.lastConfirmedEntry); + }); + + stats = admin.transactions() + .getPendingAckInternalStatsAsync(topic, subName, false).get(); + managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats; + + assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default", + "testGetPendingAckInternalStats" + "-" + + subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(), + stats.pendingAckLogStats.managedLedgerName); + assertNull(managedLedgerInternalStats.ledgers.get(0).metadata); + } + private static void verifyCoordinatorStats(String state, long sequenceId, long lowWaterMark) { assertEquals(state, "Ready"); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java index bd78e32ffe6b8..829494bc3e076 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java @@ -28,6 +28,7 @@ import org.apache.pulsar.common.policies.data.TransactionInBufferStats; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionMetadata; +import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; public interface Transactions { @@ -160,4 +161,28 @@ CompletableFuture getCoordinatorInternalSta TransactionCoordinatorInternalStats getCoordinatorInternalStats(int coordinatorId, boolean metadata) throws PulsarAdminException; + /** + * Get pending ack internal stats. + * + * @param topic the topic of get pending ack internal stats + * @param subName the subscription name of this pending ack + * @param metadata whether to obtain ledger metadata + * + * @return the future internal stats of pending ack + */ + CompletableFuture getPendingAckInternalStatsAsync(String topic, String subName, + boolean metadata); + + /** + * Get pending ack internal stats. + * + * @param topic the topic of get pending ack internal stats + * @param subName the subscription name of this pending ack + * @param metadata whether to obtain ledger metadata + * + * @return the internal stats of pending ack + */ + TransactionPendingAckInternalStats getPendingAckInternalStats(String topic, String subName, + boolean metadata) throws PulsarAdminException; + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java index df541054a51fa..30d41d9f624f4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java @@ -29,12 +29,14 @@ import org.apache.pulsar.client.admin.Transactions; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TransactionBufferStats; import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; import org.apache.pulsar.common.policies.data.TransactionMetadata; +import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; public class TransactionsImpl extends BaseResource implements Transactions { @@ -294,4 +296,46 @@ public TransactionCoordinatorInternalStats getCoordinatorInternalStats(int coord } } + @Override + public CompletableFuture getPendingAckInternalStatsAsync(String topic, + String subName, + boolean metadata) { + TopicName tn = TopicName.get(topic); + WebTarget path = adminV3Transactions.path("pendingAckInternalStats"); + path = path.path(tn.getRestPath(false)); + path = path.path(subName); + path = path.queryParam("metadata", metadata); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(TransactionPendingAckInternalStats stats) { + future.complete(stats); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + + @Override + public TransactionPendingAckInternalStats getPendingAckInternalStats(String topic, + String subName, + boolean metadata) throws PulsarAdminException { + try { + return getPendingAckInternalStatsAsync(topic, subName, metadata) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); + } + } + } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 95d5ddf0f7683..24a7f76db6811 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -1459,6 +1459,10 @@ void transactions() throws Exception { cmdTransactions = new CmdTransactions(() -> admin); cmdTransactions.run(split("pending-ack-stats -t test -s test")); verify(transactions).getPendingAckStats("test", "test"); + + cmdTransactions = new CmdTransactions(() -> admin); + cmdTransactions.run(split("pending-ack-internal-stats -t test -s test")); + verify(transactions).getPendingAckInternalStats("test", "test", false); } String[] split(String s) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java index e8a5eef4b029e..7faf006066ca6 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java @@ -157,9 +157,26 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Get pending ack internal stats") + private class GetPendingAckInternalStats extends CliCommand { + @Parameter(names = {"-t", "--topic"}, description = "the topic name", required = true) + private String topic; + + @Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true) + private String subName; + + @Parameter(names = { "-m", "--metadata" }, description = "Flag to include ledger metadata") + private boolean metadata = false; + @Override + void run() throws Exception { + print(getAdmin().transactions().getPendingAckInternalStats(topic, subName, metadata)); + } + } + public CmdTransactions(Supplier admin) { super("transactions", admin); jcommander.addCommand("coordinator-internal-stats", new GetCoordinatorInternalStats()); + jcommander.addCommand("pending-ack-internal-stats", new GetPendingAckInternalStats()); jcommander.addCommand("coordinator-stats", new GetCoordinatorStats()); jcommander.addCommand("transaction-buffer-stats", new GetTransactionBufferStats()); jcommander.addCommand("pending-ack-stats", new GetPendingAckStats()); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInternalStats.java index a4c685aa9183a..0d2a33471a558 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInternalStats.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInternalStats.java @@ -23,6 +23,6 @@ */ public class TransactionCoordinatorInternalStats { - /** The transaction log stats*/ + /** The transaction coordinator log stats */ public TransactionLogStats transactionLogStats; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckInternalStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckInternalStats.java new file mode 100644 index 0000000000000..8a8163926ed0a --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckInternalStats.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +/** + * Transaction pending ack internal stats. + */ +public class TransactionPendingAckInternalStats { + + /** The transaction pending ack log stats */ + public TransactionLogStats pendingAckLogStats; +}