Skip to content

Commit

Permalink
[Transaction] Transaction admin api transaction in pendingack stats (a…
Browse files Browse the repository at this point in the history
…pache#10648)

## Motivation
Transaction add admin api `getTransactionInPendingAckStats`
now only cumulativeAckPosition
individual position will add in next pr
## implement
```
public class TransactionInPendingAckStats {   
 /** The position of this transaction cumulative ack. */    
public String cumulativeAckPosition;
}
```
  • Loading branch information
congbobo184 authored May 24, 2021
1 parent 3e42503 commit 2ade027
Show file tree
Hide file tree
Showing 13 changed files with 248 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static javax.ws.rs.core.Response.Status.TEMPORARY_REDIRECT;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -102,6 +101,42 @@ protected void internalGetCoordinatorStatus(AsyncResponse asyncResponse, boolean
}
}

protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
long mostSigBits, long leastSigBits, String topic,
String subName) {
if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
validateTopicOwnership(TopicName.get(topic), authoritative);
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, e) -> {

if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}
if (!optionalTopic.isPresent()) {
asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic don't owner by this broker!"));
return;
}
Topic topicObject = optionalTopic.get();
if (topicObject instanceof PersistentTopic) {
asyncResponse.resume(((PersistentTopic) topicObject)
.getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
} else {
asyncResponse.resume(new RestException(NOT_IMPLEMENTED, "Topic is not a persistent topic!"));
}
});
} else {
asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic don't owner by this broker!"));
}
} else {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"This Broker is not configured with transactionCoordinatorEnabled=true."));
}
}

protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
long mostSigBits, long leastSigBits,
String topic) {
Expand All @@ -116,7 +151,7 @@ protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse,
return;
}
if (!optionalTopic.isPresent()) {
asyncResponse.resume(new RestException(INTERNAL_SERVER_ERROR,
asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
"Topic don't owner by this broker!"));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,30 @@ public void getTransactionInBufferStats(@Suspended final AsyncResponse asyncResp
@QueryParam("topic") String topic) {
internalGetTransactionInBufferStats(asyncResponse, authoritative, mostSigBits, leastSigBits, topic);
}

@GET
@Path("/transactionInPendingAckStats")
@ApiOperation(value = "Get transaction state in pending ack.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 503, message = "This Broker is not configured "
+ "with transactionCoordinatorEnabled=true."),
@ApiResponse(code = 307, message = "Topic don't owner by this broker!"),
@ApiResponse(code = 501, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getTransactionInPendingAckStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@QueryParam("mostSigBits")
@ApiParam(value = "Most sig bits of this transaction", required = true)
long mostSigBits,
@ApiParam(value = "Least sig bits of this transaction", required = true)
@QueryParam("leastSigBits") long leastSigBits,
@ApiParam(value = "Topic name", required = true)
@QueryParam("topic") String topic,
@ApiParam(value = "Subscription name", required = true)
@QueryParam("subName") String subName) {
internalGetTransactionInPendingAckStats(asyncResponse, authoritative, mostSigBits,
leastSigBits, topic, subName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -1145,5 +1146,9 @@ public boolean checkAndUnblockIfStuck() {
return dispatcher != null ? dispatcher.checkAndUnblockIfStuck() : false;
}

public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
return this.pendingAckHandle.getTransactionInPendingAckStats(txnID);
}

private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
Expand Down Expand Up @@ -3074,4 +3075,8 @@ public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
protected boolean isTerminated() {
return ledger.isTerminated();
}

public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID, String subName) {
return this.subscriptions.get(subName).getTransactionInPendingAckStats(txnID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;

/**
Expand Down Expand Up @@ -129,6 +130,14 @@ CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePa
*/
CompletableFuture<PendingAckHandle> pendingAckHandleFuture();

/**
* Get transaction in pending ack stats.
*
* @param txnID the txnID
* @return the stats of this transaction in pending ack.
*/
TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID);

/**
* Close the pending ack handle.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;

/**
Expand Down Expand Up @@ -79,6 +80,11 @@ public CompletableFuture<PendingAckHandle> pendingAckHandleFuture() {
return pendingAckHandleCompletableFuture;
}

@Override
public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
return null;
}

@Override
public CompletableFuture<Void> close() {
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
Expand Down Expand Up @@ -699,6 +700,27 @@ public void completeHandleFuture() {
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
}

@Override
public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
TransactionInPendingAckStats transactionInPendingAckStats = new TransactionInPendingAckStats();
if (cumulativeAckOfTransaction != null && cumulativeAckOfTransaction.getLeft().equals(txnID)) {
PositionImpl position = cumulativeAckOfTransaction.getRight();
StringBuilder stringBuilder = new StringBuilder()
.append(position.getLedgerId())
.append(':')
.append(position.getEntryId());
if (cumulativeAckOfTransaction.getRight().hasAckSet()) {
BitSetRecyclable bitSetRecyclable =
BitSetRecyclable.valueOf(cumulativeAckOfTransaction.getRight().getAckSet());
if (!bitSetRecyclable.isEmpty()) {
stringBuilder.append(":").append(bitSetRecyclable.nextSetBit(0) - 1);
}
}
transactionInPendingAckStats.cumulativeAckPosition = stringBuilder.toString();
}
return transactionInPendingAckStats;
}

@Override
public CompletableFuture<Void> close() {
return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,23 @@
import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
Expand All @@ -42,8 +46,8 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
Expand Down Expand Up @@ -121,6 +125,43 @@ public void testGetTransactionInBufferStats() throws Exception {
assertTrue(transactionInBufferStats.aborted);
}

@Test(timeOut = 20000)
public void testGetTransactionPendingAckStats() throws Exception {
initTransaction(2);
final String topic = "persistent://public/default/testGetTransactionInBufferStats";
final String subName = "test";
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName).subscribe();
producer.sendAsync("Hello pulsar!".getBytes());
producer.sendAsync("Hello pulsar!".getBytes());
producer.sendAsync("Hello pulsar!".getBytes());
producer.sendAsync("Hello pulsar!".getBytes());
TransactionImpl transaction = (TransactionImpl) getTransaction();
TransactionInPendingAckStats transactionInPendingAckStats = admin.transactions()
.getTransactionInPendingAckStats(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic, subName).get();
assertNull(transactionInPendingAckStats.cumulativeAckPosition);

consumer.receive();
consumer.receive();
Message<byte[]> message = consumer.receive();
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) message.getMessageId();
consumer.acknowledgeCumulativeAsync(batchMessageId, transaction).get();

transactionInPendingAckStats = admin.transactions()
.getTransactionInPendingAckStats(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic, subName).get();

assertEquals(transactionInPendingAckStats.cumulativeAckPosition,
String.valueOf(batchMessageId.getLedgerId()) +
':' +
batchMessageId.getEntryId() +
':' +
batchMessageId.getBatchIndex());
}

private static void verifyCoordinatorStatus(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
Expand All @@ -138,6 +179,6 @@ private void initTransaction(int coordinatorSize) throws Exception {

private Transaction getTransaction() throws Exception {
return pulsarClient.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS).build().get();
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;

public interface Transactions {

Expand Down Expand Up @@ -50,4 +51,15 @@ public interface Transactions {
*/
CompletableFuture<TransactionInBufferStats> getTransactionInBufferStats(TxnID txnID, String topic);

/**
* Get transaction in pending ack stats.
*
* @param txnID the txnId
* @param topic the ack topic
* @param subName the sub name of this transaction ack
* @return the future stats of transaction in pending ack.
*/
CompletableFuture<TransactionInPendingAckStats> getTransactionInPendingAckStats(TxnID txnID, String topic,
String subName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;

public class TransactionsImpl extends BaseResource implements Transactions {
private final WebTarget adminV3Transactions;
Expand Down Expand Up @@ -97,4 +98,28 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public CompletableFuture<TransactionInPendingAckStats> getTransactionInPendingAckStats(TxnID txnID, String topic,
String subName) {
WebTarget path = adminV3Transactions.path("transactionInPendingAckStats");
path = path.queryParam("mostSigBits", txnID.getMostSigBits());
path = path.queryParam("leastSigBits", txnID.getLeastSigBits());
path = path.queryParam("topic", topic);
path = path.queryParam("subName", subName);
final CompletableFuture<TransactionInPendingAckStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<TransactionInPendingAckStats>() {
@Override
public void completed(TransactionInPendingAckStats stats) {
future.complete(stats);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
Expand Down Expand Up @@ -1417,11 +1416,7 @@ void proxy() throws Exception {
void transactions() throws Exception {
PulsarAdmin admin = Mockito.mock(PulsarAdmin.class);
Transactions transactions = Mockito.mock(Transactions.class);
CompletableFuture<TransactionCoordinatorStatus> transactionMetadataStoreInfo = mock(CompletableFuture.class);
CompletableFuture<List<TransactionCoordinatorStatus>> lists = mock(CompletableFuture.class);
doReturn(transactions).when(admin).transactions();
doReturn(transactionMetadataStoreInfo).when(transactions).getCoordinatorStatusById(1);
doReturn(lists).when(transactions).getCoordinatorStatus();

CmdTransactions cmdTransactions = new CmdTransactions(() -> admin);

Expand All @@ -1435,6 +1430,11 @@ void transactions() throws Exception {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("transaction-in-buffer-stats -m 1 -t test -l 2"));
verify(transactions).getTransactionInBufferStats(new TxnID(1, 2), "test");

cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("transaction-in-pending-ack-stats -m 1 -l 2 -t test -s test"));
verify(transactions).getTransactionInPendingAckStats(
new TxnID(1, 2), "test", "test");
}

String[] split(String s) {
Expand Down
Loading

0 comments on commit 2ade027

Please sign in to comment.