Skip to content

Commit

Permalink
[Transaction] Transaction admin api get transaction metadata (apache#…
Browse files Browse the repository at this point in the history
…10690)

## Motivation
Transaction add admin api `getTransactionMetadata`

## implement
```
@DaTa
public class TransactionMetadata {

    /** The txnId of this transaction. */
    public String txnId;

    /** The status of this transaction. */
    public String status;

    /** The open time of this transaction. */
    public long openTimestamp;

    /** The timeout of this transaction. */
    public long timeoutAt;

    /** The producedPartitions of this transaction. */
    public Map<String, TransactionInBufferStats> producedPartitions;

    /** The ackedPartitions of this transaction. */
    public Map<String, Map<String, TransactionInPendingAckStats>> ackedPartitions;
}
```
  • Loading branch information
congbobo184 authored May 25, 2021
1 parent 3311c22 commit fa0a1c1
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static javax.ws.rs.core.Response.Status.TEMPORARY_REDIRECT;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -34,12 +37,18 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;

@Slf4j
public abstract class TransactionsBase extends AdminResource {
Expand Down Expand Up @@ -172,4 +181,109 @@ protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse,
"This Broker is not configured with transactionCoordinatorEnabled=true."));
}
}

protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
boolean authoritative, int coordinatorId, long sequenceID) {
try {
if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
authoritative);
Transactions transactions = pulsar().getAdminClient().transactions();
TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
.getTxnMeta(new TxnID(coordinatorId, sequenceID)).get();
TxnID txnID = txnMeta.id();
TransactionMetadata transactionMetadata = new TransactionMetadata();
transactionMetadata.txnId = txnID.toString();
transactionMetadata.status = txnMeta.status().name();
transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp();
transactionMetadata.timeoutAt = txnMeta.getTimeoutAt();

List<CompletableFuture<TransactionInPendingAckStats>> ackedPartitionsFutures = new ArrayList<>();
Map<String, Map<String, CompletableFuture<TransactionInPendingAckStats>>> ackFutures = new HashMap<>();
txnMeta.ackedPartitions().forEach(transactionSubscription -> {
String topic = transactionSubscription.getTopic();
String subName = transactionSubscription.getSubscription();
CompletableFuture<TransactionInPendingAckStats> future =
transactions.getTransactionInPendingAckStats(txnID, topic, subName);
ackedPartitionsFutures.add(future);
if (ackFutures.containsKey(topic)) {
ackFutures.get(topic)
.put(transactionSubscription.getSubscription(), future);
} else {
Map<String, CompletableFuture<TransactionInPendingAckStats>> pendingAckStatsMap =
new HashMap<>();
pendingAckStatsMap.put(transactionSubscription.getSubscription(), future);
ackFutures.put(topic, pendingAckStatsMap);
}
});

List<CompletableFuture<TransactionInBufferStats>> producedPartitionsFutures = new ArrayList<>();
Map<String, CompletableFuture<TransactionInBufferStats>> produceFutures = new HashMap<>();
txnMeta.producedPartitions().forEach(topic -> {
CompletableFuture<TransactionInBufferStats> future =
transactions.getTransactionInBufferStats(txnID, topic);
producedPartitionsFutures.add(future);
produceFutures.put(topic, future);

});

FutureUtil.waitForAll(ackedPartitionsFutures).whenComplete((v, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}

FutureUtil.waitForAll(producedPartitionsFutures).whenComplete((x, t) -> {
if (t != null) {
asyncResponse.resume(new RestException(t));
return;
}

Map<String, Map<String, TransactionInPendingAckStats>> ackedPartitions = new HashMap<>();
Map<String, TransactionInBufferStats> producedPartitions = new HashMap<>();

for (String topic : ackFutures.keySet()) {
Map<String, TransactionInPendingAckStats> subs = new HashMap<>();
for (String sub : ackFutures.get(topic).keySet()) {
try {
subs.put(sub, ackFutures.get(topic).get(sub).get());
} catch (Exception exception) {
asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}

ackedPartitions.put(topic, subs);
}

for (String topic : produceFutures.keySet()) {
try {
producedPartitions.put(topic, produceFutures.get(topic).get());
} catch (Exception exception) {
asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}
transactionMetadata.ackedPartitions = ackedPartitions;
transactionMetadata.producedPartitions = producedPartitions;
asyncResponse.resume(transactionMetadata);
});
});
} else {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"This Broker is not configured with transactionCoordinatorEnabled=true."));
}
} catch (Exception e) {
if (e instanceof ExecutionException) {
if (e.getCause() instanceof CoordinatorNotFoundException
|| e.getCause() instanceof TransactionNotFoundException) {
asyncResponse.resume(new RestException(NOT_FOUND, e.getCause()));
return;
}
asyncResponse.resume(new RestException(e.getCause()));
} else {
asyncResponse.resume(new RestException(e));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,27 @@ public void getTransactionInPendingAckStats(@Suspended final AsyncResponse async
internalGetTransactionInPendingAckStats(asyncResponse, authoritative, mostSigBits,
leastSigBits, topic, subName);
}

@GET
@Path("/transactionMetadata")
@ApiOperation(value = "Get transaction metadata")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+ "or coordinator or transaction doesn't exist"),
@ApiResponse(code = 503, message = "This Broker is not configured "
+ "with transactionCoordinatorEnabled=true."),
@ApiResponse(code = 307, message = "Topic don't owner by this broker!"),
@ApiResponse(code = 501, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getTransactionMetadata(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@QueryParam("mostSigBits")
@ApiParam(value = "Most sig bits of this transaction", required = true)
int mostSigBits,
@ApiParam(value = "Least sig bits of this transaction", required = true)
@QueryParam("leastSigBits") long leastSigBits) {
internalGetTransactionMetadata(asyncResponse, authoritative, mostSigBits, leastSigBits);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -162,6 +163,77 @@ public void testGetTransactionPendingAckStats() throws Exception {
batchMessageId.getBatchIndex());
}

@Test(timeOut = 20000)
public void testGetTransactionMetadata() throws Exception {
initTransaction(2);
long currentTime = System.currentTimeMillis();
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic1 = "persistent://public/default/testGetTransactionMetadata-1";
final String subName1 = "test1";
final String topic2 = "persistent://public/default/testGetTransactionMetadata-2";
final String subName2 = "test2";
final String subName3 = "test3";
admin.topics().createNonPartitionedTopic(topic1);
admin.topics().createNonPartitionedTopic(topic2);

Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic1).create();

Producer<byte[]> producer2 = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic2).create();

Consumer<byte[]> consumer1 = pulsarClient.newConsumer(Schema.BYTES).topic(topic1)
.subscriptionName(subName1).subscribe();

Consumer<byte[]> consumer2 = pulsarClient.newConsumer(Schema.BYTES).topic(topic2)
.subscriptionName(subName2).subscribe();

Consumer<byte[]> consumer3 = pulsarClient.newConsumer(Schema.BYTES).topic(topic2)
.subscriptionName(subName3).subscribe();

MessageId messageId1 = producer1.send("Hello pulsar!".getBytes());
MessageId messageId2 = producer2.send("Hello pulsar!".getBytes());
MessageId messageId3 = producer1.newMessage(transaction).value("Hello pulsar!".getBytes()).send();
MessageId messageId4 = producer2.newMessage(transaction).value("Hello pulsar!".getBytes()).send();

consumer1.acknowledgeCumulativeAsync(messageId1, transaction).get();
consumer2.acknowledgeCumulativeAsync(messageId2, transaction).get();
consumer3.acknowledgeCumulativeAsync(messageId2, transaction).get();
TxnID txnID = new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits());
TransactionMetadata transactionMetadata = admin.transactions()
.getTransactionMetadata(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits())).get();

assertEquals(transactionMetadata.txnId, txnID.toString());
assertTrue(transactionMetadata.openTimestamp > currentTime);
assertEquals(transactionMetadata.timeoutAt, 5000L);
assertEquals(transactionMetadata.status, "OPEN");

Map<String, TransactionInBufferStats> producedPartitions = transactionMetadata.producedPartitions;
Map<String, Map<String, TransactionInPendingAckStats>> ackedPartitions = transactionMetadata.ackedPartitions;

PositionImpl position1 = getPositionByMessageId(messageId1);
PositionImpl position2 = getPositionByMessageId(messageId2);
PositionImpl position3 = getPositionByMessageId(messageId3);
PositionImpl position4 = getPositionByMessageId(messageId4);

assertFalse(producedPartitions.get(topic1).aborted);
assertFalse(producedPartitions.get(topic2).aborted);
assertEquals(producedPartitions.get(topic1).startPosition, position3.toString());
assertEquals(producedPartitions.get(topic2).startPosition, position4.toString());

assertEquals(ackedPartitions.get(topic1).size(), 1);
assertEquals(ackedPartitions.get(topic2).size(), 2);
assertEquals(ackedPartitions.get(topic1).get(subName1).cumulativeAckPosition, position1.toString());
assertEquals(ackedPartitions.get(topic2).get(subName2).cumulativeAckPosition, position2.toString());
assertEquals(ackedPartitions.get(topic2).get(subName3).cumulativeAckPosition, position2.toString());

}

private static PositionImpl getPositionByMessageId(MessageId messageId) {
return PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId());
}

private static void verifyCoordinatorStatus(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;

public interface Transactions {

Expand Down Expand Up @@ -62,4 +63,12 @@ public interface Transactions {
CompletableFuture<TransactionInPendingAckStats> getTransactionInPendingAckStats(TxnID txnID, String topic,
String subName);

/**
* Get transaction metadata.
*
* @param txnID the ID of this transaction
* @return the future metadata of this transaction.
*/
CompletableFuture<TransactionMetadata> getTransactionMetadata(TxnID txnID);

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;

public class TransactionsImpl extends BaseResource implements Transactions {
private final WebTarget adminV3Transactions;
Expand Down Expand Up @@ -122,4 +123,25 @@ public void failed(Throwable throwable) {
return future;
}

@Override
public CompletableFuture<TransactionMetadata> getTransactionMetadata(TxnID txnID) {
WebTarget path = adminV3Transactions.path("transactionMetadata");
path = path.queryParam("mostSigBits", txnID.getMostSigBits());
path = path.queryParam("leastSigBits", txnID.getLeastSigBits());
final CompletableFuture<TransactionMetadata> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<TransactionMetadata>() {
@Override
public void completed(TransactionMetadata status) {
future.complete(status);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -1435,6 +1434,11 @@ void transactions() throws Exception {
cmdTransactions.run(split("transaction-in-pending-ack-stats -m 1 -l 2 -t test -s test"));
verify(transactions).getTransactionInPendingAckStats(
new TxnID(1, 2), "test", "test");

cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("transaction-metadata -m 1 -l 2"));
verify(transactions).getTransactionMetadata(new TxnID(1, 2));

}

String[] split(String s) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,25 @@ void run() throws Exception {
}
}

@Parameters(commandDescription = "Get transaction metadata")
private class GetTransactionMetadata extends CliCommand {
@Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
private int mostSigBits;

@Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
private long leastSigBits;

@Override
void run() throws Exception {
print(getAdmin().transactions().getTransactionMetadata(new TxnID(mostSigBits, leastSigBits)));
}
}

public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
jcommander.addCommand("coordinator-status", new GetCoordinatorStatus());
jcommander.addCommand("transaction-in-buffer-stats", new GetTransactionInBufferStats());
jcommander.addCommand("transaction-in-pending-ack-stats", new GetTransactionInPendingAckStats());
jcommander.addCommand("transaction-metadata", new GetTransactionMetadata());
}
}
Loading

0 comments on commit fa0a1c1

Please sign in to comment.