Skip to content

Commit

Permalink
[Transaction] Transaction admin api get pending ack internal stats (a…
Browse files Browse the repository at this point in the history
…pache#10725)

## Motivation
Transaction add admin api `getPendingAckInternalStats`
## implement
```
/**
 * Transaction pending ack internal stats.
 */
public class TransactionPendingAckInternalStats {

    /** The manage ledger internal stats*/
    public ManagedLedgerInternalStats managedLedgerInternalStats;
}
```
  • Loading branch information
congbobo184 authored May 28, 2021
1 parent 6967714 commit 2fce656
Show file tree
Hide file tree
Showing 13 changed files with 297 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
Expand All @@ -50,6 +53,7 @@
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionLogStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
Expand Down Expand Up @@ -504,4 +508,69 @@ protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse,
asyncResponse.resume(new RestException(e.getCause()));
}
}

protected void internalGetPendingAckInternalStats(AsyncResponse asyncResponse, boolean authoritative,
TopicName topicName, String subName, boolean metadata) {
try {
if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
validateTopicOwnership(topicName, authoritative);
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(topicName.toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, e) -> {

if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}
if (!optionalTopic.isPresent()) {
asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic is not owned by this broker!"));
return;
}
Topic topicObject = optionalTopic.get();
if (topicObject instanceof PersistentTopic) {
try {
ManagedLedger managedLedger =
((PersistentTopic) topicObject).getPendingAckManagedLedger(subName).get();
TransactionPendingAckInternalStats stats =
new TransactionPendingAckInternalStats();
TransactionLogStats pendingAckLogStats = new TransactionLogStats();
pendingAckLogStats.managedLedgerName = managedLedger.getName();
pendingAckLogStats.managedLedgerInternalStats =
managedLedger.getManagedLedgerInternalStats(metadata).get();
stats.pendingAckLogStats = pendingAckLogStats;
asyncResponse.resume(stats);
} catch (Exception exception) {
if (exception instanceof ExecutionException) {
if (exception.getCause() instanceof ServiceUnitNotReadyException) {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
exception.getCause()));
return;
} else if (exception.getCause() instanceof NotAllowedException) {
asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
exception.getCause()));
return;
} else if (exception.getCause() instanceof SubscriptionNotFoundException) {
asyncResponse.resume(new RestException(NOT_FOUND, exception.getCause()));
return;
}
}
asyncResponse.resume(new RestException(exception));
}
} else {
asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
}
});
} else {
asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
}
} else {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"This Broker is not configured with transactionCoordinatorEnabled=true."));
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e.getCause()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.swagger.annotations.ApiResponses;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
Expand All @@ -34,6 +35,8 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import org.apache.pulsar.broker.admin.impl.TransactionsBase;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;

@Path("/transactions")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -200,4 +203,28 @@ public void getCoordinatorInternalStats(@Suspended final AsyncResponse asyncResp
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
internalGetCoordinatorInternalStats(asyncResponse, authoritative, metadata, Integer.parseInt(coordinatorId));
}

@GET
@Path("/pendingAckInternalStats/{tenant}/{namespace}/{topic}/{subName}")
@ApiOperation(value = "Get transaction pending ack internal stats.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+ "or subscription name doesn't exist"),
@ApiResponse(code = 503, message = "This Broker is not configured "
+ "with transactionCoordinatorEnabled=true."),
@ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
@ApiResponse(code = 405, message = "Pending ack handle don't use managedLedger!"),
@ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getPendingAckInternalStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String subName,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
internalGetPendingAckInternalStats(asyncResponse, authoritative,
TopicName.get(TopicDomain.persistent.value(), tenant, namespace, encodedTopic), subName, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
Expand Down Expand Up @@ -1155,5 +1156,13 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID)
return this.pendingAckHandle.getTransactionInPendingAckStats(txnID);
}

public CompletableFuture<ManagedLedger> getPendingAckManageLedger() {
if (this.pendingAckHandle instanceof PendingAckHandleImpl) {
return ((PendingAckHandleImpl) this.pendingAckHandle).getStoreManageLedger();
} else {
return FutureUtil.failedFuture(new NotAllowedException("Pending ack handle don't use managedLedger!"));
}
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
Expand Down Expand Up @@ -3087,4 +3088,13 @@ protected boolean isTerminated() {
public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) {
return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID);
}

public CompletableFuture<ManagedLedger> getPendingAckManagedLedger(String subName) {
PersistentSubscription subscription = subscriptions.get(subName);
if (subscription == null) {
return FutureUtil.failedFuture(new SubscriptionNotFoundException((topic
+ " not found subscription : " + subName)));
}
return subscription.getPendingAckManageLedger();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class MLPendingAckStore implements PendingAckStore {

public static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack";

private static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state";
public static final String PENDING_ACK_STORE_CURSOR_NAME = "__pending_ack_state";

private final SpscArrayQueue<Entry> entryQueue;

Expand Down Expand Up @@ -384,6 +384,10 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

}

public CompletableFuture<ManagedLedger> getManagedLedger() {
return CompletableFuture.completedFuture(this.managedLedger);
}

public static String getTransactionPendingAckStoreSuffix(String originTopicName, String subName) {
return TopicName.get(originTopicName) + "-" + subName + PENDING_ACK_STORE_SUFFIX;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
Expand Down Expand Up @@ -733,4 +734,19 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID)
public CompletableFuture<Void> close() {
return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync);
}

public CompletableFuture<ManagedLedger> getStoreManageLedger() {
if (this.pendingAckStoreFuture.isDone()) {
return this.pendingAckStoreFuture.thenCompose(pendingAckStore -> {
if (pendingAckStore instanceof MLPendingAckStore) {
return ((MLPendingAckStore) pendingAckStore).getManagedLedger();
} else {
return FutureUtil.failedFuture(
new NotAllowedException("Pending ack handle don't use managedLedger!"));
}
});
} else {
return FutureUtil.failedFuture(new ServiceUnitNotReadyException("Pending ack have not init success!"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -41,6 +42,7 @@
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
Expand Down Expand Up @@ -343,6 +345,46 @@ public void testGetCoordinatorInternalStats() throws Exception {
stats.transactionLogStats.managedLedgerName);
}

@Test(timeOut = 20000)
public void testGetPendingAckInternalStats() throws Exception {
initTransaction(1);
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic = "persistent://public/default/testGetPendingAckInternalStats";
final String subName = "test";
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName).subscribe();
MessageId messageId = producer.send("Hello pulsar!".getBytes());
consumer.acknowledgeAsync(messageId, transaction).get();

TransactionPendingAckInternalStats stats = admin.transactions()
.getPendingAckInternalStatsAsync(topic, subName, true).get();
ManagedLedgerInternalStats managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats;
assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default",
"testGetPendingAckInternalStats" + "-"
+ subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
stats.pendingAckLogStats.managedLedgerName);

verifyManagedLegerInternalStats(managedLedgerInternalStats, 16);

ManagedLedgerInternalStats finalManagedLedgerInternalStats = managedLedgerInternalStats;
managedLedgerInternalStats.cursors.forEach((s, cursorStats) -> {
assertEquals(s, MLPendingAckStore.PENDING_ACK_STORE_CURSOR_NAME);
assertEquals(cursorStats.readPosition, finalManagedLedgerInternalStats.lastConfirmedEntry);
});

stats = admin.transactions()
.getPendingAckInternalStatsAsync(topic, subName, false).get();
managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats;

assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default",
"testGetPendingAckInternalStats" + "-"
+ subName + MLPendingAckStore.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
stats.pendingAckLogStats.managedLedgerName);
assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
}

private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;

public interface Transactions {
Expand Down Expand Up @@ -160,4 +161,28 @@ CompletableFuture<TransactionCoordinatorInternalStats> getCoordinatorInternalSta
TransactionCoordinatorInternalStats getCoordinatorInternalStats(int coordinatorId,
boolean metadata) throws PulsarAdminException;

/**
* Get pending ack internal stats.
*
* @param topic the topic of get pending ack internal stats
* @param subName the subscription name of this pending ack
* @param metadata whether to obtain ledger metadata
*
* @return the future internal stats of pending ack
*/
CompletableFuture<TransactionPendingAckInternalStats> getPendingAckInternalStatsAsync(String topic, String subName,
boolean metadata);

/**
* Get pending ack internal stats.
*
* @param topic the topic of get pending ack internal stats
* @param subName the subscription name of this pending ack
* @param metadata whether to obtain ledger metadata
*
* @return the internal stats of pending ack
*/
TransactionPendingAckInternalStats getPendingAckInternalStats(String topic, String subName,
boolean metadata) throws PulsarAdminException;

}
Loading

0 comments on commit 2fce656

Please sign in to comment.