Skip to content

Commit

Permalink
[Improve] [txn] add txn admin cmd and swagger config for transaction …
Browse files Browse the repository at this point in the history
…admin (apache#16396)

### Motivation &  Modifications
1. Added missing admin cmd  
2. Add swagger config for transaction admin
3. optimize description and URL
Due to the [PR](apache#15682) not being cherry-picked, there are no break changes.
  • Loading branch information
liangyepianzhou authored Jul 7, 2022
1 parent cd9eaca commit 318432e
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 39 deletions.
17 changes: 17 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,23 @@
<swaggerDirectory>${basedir}/target/docs</swaggerDirectory>
<swaggerFileName>swaggerfunctions</swaggerFileName>
</apiSource>
<apiSource>
<springmvc>false</springmvc>
<locations>org.apache.pulsar.broker.admin.v3.Transactions</locations>
<schemes>http,https</schemes>
<basePath>/admin/v3</basePath>
<info>
<title>Pulsar Transactions REST API</title>
<version>v3</version>
<description>This provides the REST API for Pulsar Transactions operations</description>
<license>
<url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
<name>Apache 2.0</name>
</license>
</info>
<swaggerDirectory>${basedir}/target/docs</swaggerDirectory>
<swaggerFileName>swaggertransactions</swaggerFileName>
</apiSource>
<apiSource>
<springmvc>false</springmvc>
<locations>org.apache.pulsar.broker.admin.v3.Sources</locations>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public void scaleTransactionCoordinators(@Suspended final AsyncResponse asyncRes
}

@GET
@Path("/pendingAckStats/{tenant}/{namespace}/{topic}/{subName}/{ledgerId}/{entryId}")
@Path("/positionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}/{ledgerId}/{entryId}")
@ApiOperation(value = "Get position stats in pending ack.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,16 +703,16 @@ public void testCheckPositionInPendingAckState() throws Exception {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();

PositionInPendingAckStats result = admin.transactions().checkPositionInPendingAckState(topic, subName,
PositionInPendingAckStats result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.PendingAckNotReady);

consumer.acknowledgeAsync(messageId, transaction).get();
result = admin.transactions().checkPositionInPendingAckState(topic, subName,
result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.PendingAck);
transaction.commit().get();
result = admin.transactions().checkPositionInPendingAckState(topic, subName,
result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.MarkDelete);
}
Expand Down Expand Up @@ -771,16 +771,16 @@ public void testGetPositionStatsInPendingAckStatsFroBatch() throws Exception {
consumer.acknowledgeAsync(messageId, transaction).get();

PositionInPendingAckStats positionStatsInPendingAckStats =
admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 1);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.PendingAck);

positionStatsInPendingAckStats =
admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 2);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.NotInPendingAck);
positionStatsInPendingAckStats =
admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 10);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.InvalidPosition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,20 +308,19 @@ TransactionPendingAckInternalStats getPendingAckInternalStats(String topic, Stri
CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas);

/**
* Check whether the position is in pending ack stats.
*
* Get the position stats in transaction pending ack.
* @param topic the topic of checking position in pending ack state
* @param subName the subscription name of this pending ack
* @param ledgerId the ledger id of the message position.
* @param entryId the entry id of the message position.
* @param batchIndex the batch index of the message position, `null` means not batch message.
* @return {@link PositionInPendingAckStats} a state identified whether the position state.
*/
PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId, Long entryId,
Integer batchIndex) throws PulsarAdminException;
PositionInPendingAckStats getPositionStatsInPendingAck(String topic, String subName, Long ledgerId, Long entryId,
Integer batchIndex) throws PulsarAdminException;

/**
* Check whether the position is in pending ack stats.
* Get the position stats in transaction pending ack.
*
* @param topic the topic of checking position in pending ack state
* @param subName the subscription name of this pending ack
Expand All @@ -330,7 +329,7 @@ PositionInPendingAckStats checkPositionInPendingAckState(String topic, String su
* @param batchIndex the batch index of the message position, `null` means not batch message.
* @return {@link PositionInPendingAckStats} a state identified whether the position state.
*/
CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic, String subName,
Long ledgerId, Long entryId,
Integer batchIndex);
CompletableFuture<PositionInPendingAckStats> getPositionStatsInPendingAckAsync(String topic, String subName,
Long ledgerId, Long entryId,
Integer batchIndex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -359,13 +359,13 @@ public CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas) {
}

@Override
public CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic,
String subName,
Long ledgerId,
Long entryId,
Integer batchIndex) {
public CompletableFuture<PositionInPendingAckStats> getPositionStatsInPendingAckAsync(String topic,
String subName,
Long ledgerId,
Long entryId,
Integer batchIndex) {
TopicName tn = TopicName.get(topic);
WebTarget path = adminV3Transactions.path("pendingAckStats");
WebTarget path = adminV3Transactions.path("positionStatsInPendingAck");
path = path.path(tn.getRestPath(false));
path = path.path(subName);
path = path.path(ledgerId.toString());
Expand All @@ -389,9 +389,9 @@ public void failed(Throwable throwable) {


@Override
public PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId,
Long entryId, Integer batchIndex)
public PositionInPendingAckStats getPositionStatsInPendingAck(String topic, String subName, Long ledgerId,
Long entryId, Integer batchIndex)
throws PulsarAdminException {
return sync(() -> checkPositionInPendingAckStateAsync(topic, subName, ledgerId, entryId, batchIndex));
return sync(() -> getPositionStatsInPendingAckAsync(topic, subName, ledgerId, entryId, batchIndex));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2171,6 +2171,10 @@ void transactions() throws Exception {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("scale-transactionCoordinators -r 3"));
verify(transactions).scaleTransactionCoordinators(3);

cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("position-stats-in-pending-ack -t test -s test -l 1 -e 1 -b 1"));
verify(transactions).getPositionStatsInPendingAck("test", "test", 1L, 1L, 1);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class CmdTransactions extends CmdBase {

@Parameters(commandDescription = "Get transaction coordinator stats")
private class GetCoordinatorStats extends CliCommand {
@Parameter(names = {"-c", "--coordinator-id"}, description = "the coordinator id", required = false)
@Parameter(names = {"-c", "--coordinator-id"}, description = "The coordinator id", required = false)
private Integer coordinatorId;

@Override
Expand All @@ -47,7 +47,7 @@ void run() throws Exception {

@Parameters(commandDescription = "Get transaction buffer stats")
private class GetTransactionBufferStats extends CliCommand {
@Parameter(names = {"-t", "--topic"}, description = "the topic", required = true)
@Parameter(names = {"-t", "--topic"}, description = "The topic", required = true)
private String topic;

@Parameter(names = {"-l", "--low-water-mark"},
Expand All @@ -62,10 +62,10 @@ void run() throws Exception {

@Parameters(commandDescription = "Get transaction pending ack stats")
private class GetPendingAckStats extends CliCommand {
@Parameter(names = {"-t", "--topic"}, description = "the topic", required = true)
@Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
private String topic;

@Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true)
@Parameter(names = {"-s", "--sub-name"}, description = "The subscription name", required = true)
private String subName;

@Parameter(names = {"-l", "--low-water-mark"},
Expand All @@ -80,16 +80,16 @@ void run() throws Exception {

@Parameters(commandDescription = "Get transaction in pending ack stats")
private class GetTransactionInPendingAckStats extends CliCommand {
@Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
@Parameter(names = {"-m", "--most-sig-bits"}, description = "The most sig bits", required = true)
private int mostSigBits;

@Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
@Parameter(names = {"-l", "--least-sig-bits"}, description = "The least sig bits", required = true)
private long leastSigBits;

@Parameter(names = {"-t", "--topic"}, description = "the topic name", required = true)
@Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
private String topic;

@Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true)
@Parameter(names = {"-s", "--sub-name"}, description = "The subscription name", required = true)
private String subName;

@Override
Expand All @@ -102,13 +102,13 @@ void run() throws Exception {

@Parameters(commandDescription = "Get transaction in buffer stats")
private class GetTransactionInBufferStats extends CliCommand {
@Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
@Parameter(names = {"-m", "--most-sig-bits"}, description = "The most sig bits", required = true)
private int mostSigBits;

@Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
@Parameter(names = {"-l", "--least-sig-bits"}, description = "The least sig bits", required = true)
private long leastSigBits;

@Parameter(names = {"-t", "--topic"}, description = "the topic", required = true)
@Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
private String topic;

@Override
Expand All @@ -119,10 +119,10 @@ void run() throws Exception {

@Parameters(commandDescription = "Get transaction metadata")
private class GetTransactionMetadata extends CliCommand {
@Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
@Parameter(names = {"-m", "--most-sig-bits"}, description = "The most sig bits", required = true)
private int mostSigBits;

@Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
@Parameter(names = {"-l", "--least-sig-bits"}, description = "The least sig bits", required = true)
private long leastSigBits;

@Override
Expand Down Expand Up @@ -172,10 +172,10 @@ void run() throws Exception {

@Parameters(commandDescription = "Get pending ack internal stats")
private class GetPendingAckInternalStats extends CliCommand {
@Parameter(names = {"-t", "--topic"}, description = "the topic name", required = true)
@Parameter(names = {"-t", "--topic"}, description = "Topic name", required = true)
private String topic;

@Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true)
@Parameter(names = {"-s", "--subscription-name"}, description = "Subscription name", required = true)
private String subName;

@Parameter(names = { "-m", "--metadata" }, description = "Flag to include ledger metadata")
Expand All @@ -196,6 +196,29 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Get the position stats in transaction pending ack")
private class GetPositionStatsInPendingAck extends CliCommand {
@Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
private String topic;

@Parameter(names = {"-s", "--subscription-name"}, description = "Subscription name", required = true)
private String subName;

@Parameter(names = {"-l", "--ledger-id"}, description = "Ledger ID of the position", required = true)
private Long ledgerId;

@Parameter(names = {"-e", "--entry-id"}, description = "Entry ID of the position", required = true)
private Long entryId;

@Parameter(names = {"-b", "--batch-index"}, description = "Batch index of the position")
private Integer batchIndex;

@Override
void run() throws Exception {
getAdmin().transactions().getPositionStatsInPendingAck(topic, subName, ledgerId, entryId, batchIndex);
}
}


public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
Expand All @@ -209,5 +232,7 @@ public CmdTransactions(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("transaction-metadata", new GetTransactionMetadata());
jcommander.addCommand("slow-transactions", new GetSlowTransactions());
jcommander.addCommand("scale-transactionCoordinators", new ScaleTransactionCoordinators());
jcommander.addCommand("position-stats-in-pending-ack", new GetPositionStatsInPendingAck());

}
}

0 comments on commit 318432e

Please sign in to comment.