Skip to content

Commit

Permalink
[Transaction] Transaction admin api get slow transaction metadata. (a…
Browse files Browse the repository at this point in the history
…pache#10701)

Co-authored-by: congbo <[email protected]>
## Motivation
Transaction add admin api `getSlowTransactions`
Transaction add admin api `getSlowTransactionsByCoordinatorId`

## implement
@DaTa
public class TransactionMetadata {

    /** The txnId of this transaction. */
    public String txnId;

    /** The status of this transaction. */
    public String status;

    /** The open time of this transaction. */
    public long openTimestamp;

    /** The timeout of this transaction. */
    public long timeoutAt;

    /** The producedPartitions of this transaction. */
    public Map<String, TransactionInBufferStats> producedPartitions;

    /** The ackedPartitions of this transaction. */
    public Map<String, Map<String, TransactionInPendingAckStats>> ackedPartitions;
}
  • Loading branch information
congbobo184 authored May 27, 2021
1 parent 2cfed8c commit 4b86c26
Show file tree
Hide file tree
Showing 10 changed files with 388 additions and 78 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
Expand Down Expand Up @@ -151,7 +152,7 @@ public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse,
@ApiResponse(code = 503, message = "This Broker is not configured "
+ "with transactionCoordinatorEnabled=true."),
@ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
@ApiResponse(code = 501, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getTransactionMetadata(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
Expand All @@ -163,4 +164,23 @@ public void getTransactionMetadata(@Suspended final AsyncResponse asyncResponse,
@QueryParam("leastSigBits") long leastSigBits) {
internalGetTransactionMetadata(asyncResponse, authoritative, mostSigBits, leastSigBits);
}

@GET
@Path("/slowTransactions/{timeout}")
@ApiOperation(value = "Get slow transactions.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+ "or coordinator or transaction doesn't exist"),
@ApiResponse(code = 503, message = "This Broker is not configured "
+ "with transactionCoordinatorEnabled=true."),
@ApiResponse(code = 307, message = "Topic don't owner by this broker!"),
@ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getSlowTransactions(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@PathParam("timeout") String timeout,
@QueryParam("coordinatorId") Integer coordinatorId) {
internalGetSlowTransactions(asyncResponse, authoritative, Long.parseLong(timeout), coordinatorId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
Expand Down Expand Up @@ -170,14 +171,14 @@ public void testGetTransactionPendingAckStats() throws Exception {
public void testGetTransactionMetadata() throws Exception {
initTransaction(2);
long currentTime = System.currentTimeMillis();
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic1 = "persistent://public/default/testGetTransactionMetadata-1";
final String subName1 = "test1";
final String topic2 = "persistent://public/default/testGetTransactionMetadata-2";
final String subName2 = "test2";
final String subName3 = "test3";
admin.topics().createNonPartitionedTopic(topic1);
admin.topics().createNonPartitionedTopic(topic2);
TransactionImpl transaction = (TransactionImpl) getTransaction();

Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic1).create();
Expand Down Expand Up @@ -283,6 +284,33 @@ public void testGetPendingAckStats() throws Exception {
assertEquals(transactionPendingAckStats.state, "Ready");
}

@Test(timeOut = 20000)
public void testGetSlowTransactions() throws Exception {
initTransaction(2);
TransactionImpl transaction1 = (TransactionImpl) pulsarClient.newTransaction()
.withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
TransactionImpl transaction2 = (TransactionImpl) pulsarClient.newTransaction()
.withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
pulsarClient.newTransaction().withTransactionTimeout(20, TimeUnit.SECONDS).build();
pulsarClient.newTransaction().withTransactionTimeout(20, TimeUnit.SECONDS).build();

Map<String, TransactionMetadata> transactionMetadataMap = admin.transactions()
.getSlowTransactionsAsync(30, TimeUnit.SECONDS).get();

assertEquals(transactionMetadataMap.size(), 2);

TxnID txnID1 = new TxnID(transaction1.getTxnIdMostBits(), transaction1.getTxnIdLeastBits());
TxnID txnID2 = new TxnID(transaction2.getTxnIdMostBits(), transaction2.getTxnIdLeastBits());

TransactionMetadata transactionMetadata = transactionMetadataMap.get(txnID1.toString());
assertNotNull(transactionMetadata);
assertEquals(transactionMetadata.timeoutAt, 60000);

transactionMetadata = transactionMetadataMap.get(txnID2.toString());
assertNotNull(transactionMetadata);
assertEquals(transactionMetadata.timeoutAt, 60000);
}

private static PositionImpl getPositionByMessageId(MessageId messageId) {
return PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
Expand Down Expand Up @@ -90,4 +91,53 @@ CompletableFuture<TransactionInPendingAckStats> getTransactionInPendingAckStats(
*/
CompletableFuture<TransactionPendingAckStats> getPendingAckStats(String topic, String subName);

/**
* Get slow transactions by coordinator id.
*
* @param coordinatorId the coordinator id of getting slow transaction status.
* @param timeout the timeout
* @param timeUnit the timeout timeUnit
* @return the future metadata of slow transactions.
*/
CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactionsByCoordinatorIdAsync(Integer coordinatorId,
long timeout,
TimeUnit timeUnit);

/**
* Get slow transactions by coordinator id.
*
* @param coordinatorId the coordinator id of getting slow transaction status.
* @param timeout the timeout
* @param timeUnit the timeout timeUnit
* @return the metadata of slow transactions.
*/
Map<String, TransactionMetadata> getSlowTransactionsByCoordinatorId(Integer coordinatorId,
long timeout,
TimeUnit timeUnit) throws Exception;

/**
* Get slow transactions.
*
* @param timeout the timeout
* @param timeUnit the timeout timeUnit
*
* @return the future metadata of slow transactions.
*/
CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactionsAsync(long timeout,
TimeUnit timeUnit);


/**
* Get slow transactions.
*
* @param timeout the timeout
* @param timeUnit the timeout timeUnit
*
* @return the metadata of slow transactions.
*/
CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactions(long timeout,
TimeUnit timeUnit) throws Exception;



}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import org.apache.pulsar.client.admin.Transactions;
Expand Down Expand Up @@ -187,5 +188,47 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactionsByCoordinatorIdAsync(
Integer coordinatorId, long timeout, TimeUnit timeUnit) {
WebTarget path = adminV3Transactions.path("slowTransactions");
path = path.path(timeUnit.toMillis(timeout) + "");
if (coordinatorId != null) {
path = path.queryParam("coordinatorId", coordinatorId);
}
final CompletableFuture<Map<String, TransactionMetadata>> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Map<String, TransactionMetadata>>() {
@Override
public void completed(Map<String, TransactionMetadata> metadataMap) {
future.complete(metadataMap);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public Map<String, TransactionMetadata> getSlowTransactionsByCoordinatorId(Integer coordinatorId,
long timeout,
TimeUnit timeUnit) throws Exception {
return getSlowTransactionsByCoordinatorIdAsync(coordinatorId, timeout, timeUnit).get();
}

@Override
public CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactionsAsync(long timeout,
TimeUnit timeUnit) {
return getSlowTransactionsByCoordinatorIdAsync(null, timeout, timeUnit);
}

@Override
public CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactions(long timeout,
TimeUnit timeUnit) throws Exception {
return getSlowTransactions(timeout, timeUnit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,15 @@ void transactions() throws Exception {
cmdTransactions.run(split("transaction-metadata -m 1 -l 2"));
verify(transactions).getTransactionMetadata(new TxnID(1, 2));

cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("slow-transactions -c 1 -t 1h"));
verify(transactions).getSlowTransactionsByCoordinatorId(
1, 3600000, TimeUnit.MILLISECONDS);

cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("slow-transactions -t 1h"));
verify(transactions).getSlowTransactions(3600000, TimeUnit.MILLISECONDS);

cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("transaction-buffer-stats -t test"));
verify(transactions).getTransactionBufferStats("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.util.RelativeTimeUtil;

@Parameters(commandDescription = "Operations on transactions")
public class CmdTransactions extends CmdBase {
Expand Down Expand Up @@ -120,6 +122,28 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Get slow transactions.")
private class GetSlowTransactions extends CliCommand {
@Parameter(names = {"-c", "--coordinator-id"}, description = "The coordinator id", required = false)
private Integer coordinatorId;

@Parameter(names = { "-t", "--time" }, description = "The transaction timeout time. "
+ "(eg: 1s, 10s, 1m, 5h, 3d)", required = true)
private String timeoutStr = "1s";

@Override
void run() throws Exception {
long timeout =
TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(timeoutStr));
if (coordinatorId != null) {
print(getAdmin().transactions().getSlowTransactionsByCoordinatorId(coordinatorId,
timeout, TimeUnit.MILLISECONDS));
} else {
print(getAdmin().transactions().getSlowTransactions(timeout, TimeUnit.MILLISECONDS));
}
}
}

public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
jcommander.addCommand("coordinator-stats", new GetCoordinatorStats());
Expand All @@ -128,5 +152,6 @@ public CmdTransactions(Supplier<PulsarAdmin> admin) {
jcommander.addCommand("transaction-in-buffer-stats", new GetTransactionInBufferStats());
jcommander.addCommand("transaction-in-pending-ack-stats", new GetTransactionInPendingAckStats());
jcommander.addCommand("transaction-metadata", new GetTransactionMetadata());
jcommander.addCommand("slow-transactions", new GetSlowTransactions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,11 @@ default long getLowWaterMark() {
* @return TransactionMetadataStoreStats {@link TransactionMetadataStoreStats}
*/
TransactionMetadataStoreStats getMetadataStoreStats();

/**
* Get the transactions witch timeout is bigger than given timeout.
*
* @return {@link TxnMeta} the txnMetas of slow transactions
*/
List<TxnMeta> getSlowTransactions(long timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,4 +155,9 @@ public TransactionMetadataStoreStats getMetadataStoreStats() {
this.transactionMetadataStoreStats.setAbortedCount(this.abortTransactionCount.longValue());
return transactionMetadataStoreStats;
}

@Override
public List<TxnMeta> getSlowTransactions(long timeout) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,17 @@ public TransactionMetadataStoreStats getMetadataStoreStats() {
return transactionMetadataStoreStats;
}

@Override
public List<TxnMeta> getSlowTransactions(long timeout) {
List<TxnMeta> txnMetas = new ArrayList<>();
txnMetaMap.forEach((k, v) -> {
if (v.getLeft().getTimeoutAt() > timeout) {
txnMetas.add(v.getLeft());
}
});
return txnMetas;
}

public static List<Subscription> txnSubscriptionToSubscription(List<TransactionSubscription> tnxSubscriptions) {
List<Subscription> subscriptions = new ArrayList<>(tnxSubscriptions.size());
for (TransactionSubscription transactionSubscription : tnxSubscriptions) {
Expand Down

0 comments on commit 4b86c26

Please sign in to comment.