Skip to content

Commit

Permalink
[Transaction] Transaction admin api transaction in buffer stats (apac…
Browse files Browse the repository at this point in the history
…he#10642)

## Motivation
Transaction add admin api `getTransactionInBufferStats`

## implement
```
@DaTa
public class TransactionInBufferStats {

    /** The start position of this transaction in transaction buffer. */
    public String startPosition;

    /** The flag of this transaction have been aborted. */
    public boolean aborted;
}
```
This is transaction buffer metrics.
  • Loading branch information
congbobo184 authored May 24, 2021
1 parent 67e703e commit 3e42503
Show file tree
Hide file tree
Showing 15 changed files with 265 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,27 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import static javax.ws.rs.core.Response.Status.TEMPORARY_REDIRECT;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;

Expand Down Expand Up @@ -64,23 +72,23 @@ protected void internalGetCoordinatorStatus(AsyncResponse asyncResponse, boolean
return;
}
}
List<TransactionCoordinatorStatus> metadataStoreInfoList = new ArrayList<>();
Map<Integer, TransactionCoordinatorStatus> status = new HashMap<>();
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}

for (CompletableFuture<TransactionCoordinatorStatus> transactionMetadataStoreInfoFuture
: transactionMetadataStoreInfoFutures) {
for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
try {
metadataStoreInfoList.add(transactionMetadataStoreInfoFuture.get());
status.put(i, transactionMetadataStoreInfoFutures.get(i).get());
} catch (Exception exception) {
asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}
asyncResponse.resume(metadataStoreInfoList);

asyncResponse.resume(status);
});
}).exceptionally(ex -> {
log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
Expand All @@ -93,4 +101,40 @@ protected void internalGetCoordinatorStatus(AsyncResponse asyncResponse, boolean
"This Broker is not configured with transactionCoordinatorEnabled=true."));
}
}

protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
long mostSigBits, long leastSigBits,
String topic) {
if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
validateTopicOwnership(TopicName.get(topic), authoritative);
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(TopicName.get(topic).toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}
if (!optionalTopic.isPresent()) {
asyncResponse.resume(new RestException(INTERNAL_SERVER_ERROR,
"Topic don't owner by this broker!"));
return;
}
Topic topicObject = optionalTopic.get();
if (topicObject instanceof PersistentTopic) {
TransactionInBufferStats transactionInBufferStats = ((PersistentTopic) topicObject)
.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits));
asyncResponse.resume(transactionInBufferStats);
} else {
asyncResponse.resume(new RestException(NOT_IMPLEMENTED, "Topic is not a persistent topic!"));
}
});
} else {
asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic don't owner by this broker!"));
}
} else {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
"This Broker is not configured with transactionCoordinatorEnabled=true."));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import javax.ws.rs.Consumes;
Expand Down Expand Up @@ -53,4 +54,27 @@ public void getCoordinatorStatus(@Suspended final AsyncResponse asyncResponse,
@QueryParam("coordinatorId") Integer coordinatorId) {
internalGetCoordinatorStatus(asyncResponse, authoritative, coordinatorId);
}

@GET
@Path("/transactionInBufferStats")
@ApiOperation(value = "Get transaction state in transaction buffer.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 503, message = "This Broker is not configured "
+ "with transactionCoordinatorEnabled=true."),
@ApiResponse(code = 307, message = "Topic don't owner by this broker!"),
@ApiResponse(code = 501, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getTransactionInBufferStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@QueryParam("mostSigBits")
@ApiParam(value = "Most sig bits of this transaction", required = true)
long mostSigBits,
@ApiParam(value = "Least sig bits of this transaction", required = true)
@QueryParam("leastSigBits") long leastSigBits,
@ApiParam(value = "Topic", required = true)
@QueryParam("topic") String topic) {
internalGetTransactionInBufferStats(asyncResponse, authoritative, mostSigBits, leastSigBits, topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
Expand Down Expand Up @@ -3065,6 +3066,10 @@ public boolean isTxnAborted(TxnID txnID) {
return this.transactionBuffer.isTxnAborted(txnID);
}

public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
return this.transactionBuffer.getTransactionInBufferStats(txnID);
}

@Override
protected boolean isTerminated() {
return ledger.isTerminated();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;

/**
* A class represent a transaction buffer. The transaction buffer
Expand Down Expand Up @@ -147,4 +148,10 @@ public interface TransactionBuffer {
* @return the stable position.
*/
PositionImpl getMaxReadPosition();

/**
* Get transaction in buffer stats.
* @return the transaction in buffer stats.
*/
TransactionInBufferStats getTransactionInBufferStats(TxnID txnID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionSealedException;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionStatusException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;

Expand Down Expand Up @@ -364,4 +365,9 @@ public PositionImpl getMaxReadPosition() {
return PositionImpl.latest;
}

@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.jctools.queues.MessagePassingQueue;
Expand Down Expand Up @@ -385,6 +386,16 @@ public PositionImpl getMaxReadPosition() {
}
}

@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
TransactionInBufferStats transactionInBufferStats = new TransactionInBufferStats();
transactionInBufferStats.aborted = isTxnAborted(txnID);
if (ongoingTxns.containsKey(txnID)) {
transactionInBufferStats.startPosition = ongoingTxns.get(txnID).toString();
}
return transactionInBufferStats;
}

@Override
public void run(Timeout timeout) {
if (checkIfReady()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionMeta;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.util.FutureUtil;

/**
Expand Down Expand Up @@ -85,4 +86,9 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
public PositionImpl getMaxReadPosition() {
return PositionImpl.latest;
}

@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,32 @@
package org.apache.pulsar.broker.admin.v3;

import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {

Expand Down Expand Up @@ -68,32 +77,52 @@ public void testGetTransactionCoordinatorStatus() throws Exception {
getTransaction().abort().get();
TransactionCoordinatorStatus transactionCoordinatorStatus =
admin.transactions().getCoordinatorStatusById(1).get();
verifyCoordinatorStatus(1L, transactionCoordinatorStatus.coordinatorId,
transactionCoordinatorStatus.state,
transactionCoordinatorStatus.sequenceId, transactionCoordinatorStatus.lowWaterMark);
verifyCoordinatorStatus(transactionCoordinatorStatus.state,
transactionCoordinatorStatus.leastSigBits, transactionCoordinatorStatus.lowWaterMark);

transactionCoordinatorStatus = admin.transactions().getCoordinatorStatusById(0).get();
verifyCoordinatorStatus(0L, transactionCoordinatorStatus.coordinatorId,
transactionCoordinatorStatus.state,
transactionCoordinatorStatus.sequenceId, transactionCoordinatorStatus.lowWaterMark);
List<TransactionCoordinatorStatus> list = admin.transactions().getCoordinatorStatusList().get();
verifyCoordinatorStatus(transactionCoordinatorStatus.state,
transactionCoordinatorStatus.leastSigBits, transactionCoordinatorStatus.lowWaterMark);
Map<Integer, TransactionCoordinatorStatus> status = admin.transactions().getCoordinatorStatus().get();

assertEquals(status.size(), 2);

transactionCoordinatorStatus = status.get(0);
verifyCoordinatorStatus(transactionCoordinatorStatus.state,
transactionCoordinatorStatus.leastSigBits, transactionCoordinatorStatus.lowWaterMark);

assertEquals(list.size(), 2);
transactionCoordinatorStatus = status.get(1);
verifyCoordinatorStatus(transactionCoordinatorStatus.state,
transactionCoordinatorStatus.leastSigBits, transactionCoordinatorStatus.lowWaterMark);
}

@Test(timeOut = 20000)
public void testGetTransactionInBufferStats() throws Exception {
initTransaction(2);
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic = "persistent://public/default/testGetTransactionInBufferStats";
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
MessageId messageId = producer.newMessage(transaction).value("Hello pulsar!".getBytes()).send();
TransactionInBufferStats transactionInBufferStats = admin.transactions()
.getTransactionInBufferStats(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic).get();
PositionImpl position =
PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId());
assertEquals(transactionInBufferStats.startPosition, position.toString());
assertFalse(transactionInBufferStats.aborted);

transactionCoordinatorStatus = list.get(0);
verifyCoordinatorStatus(0L, transactionCoordinatorStatus.coordinatorId,
transactionCoordinatorStatus.state,
transactionCoordinatorStatus.sequenceId, transactionCoordinatorStatus.lowWaterMark);
transaction.abort().get();

transactionCoordinatorStatus = list.get(1);
verifyCoordinatorStatus(1L, transactionCoordinatorStatus.coordinatorId,
transactionCoordinatorStatus.state,
transactionCoordinatorStatus.sequenceId, transactionCoordinatorStatus.lowWaterMark);
transactionInBufferStats = admin.transactions()
.getTransactionInBufferStats(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic).get();
assertNull(transactionInBufferStats.startPosition);
assertTrue(transactionInBufferStats.aborted);
}

private static void verifyCoordinatorStatus(long expectedCoordinatorId, long coordinatorId, String state,
private static void verifyCoordinatorStatus(String state,
long sequenceId, long lowWaterMark) {
assertEquals(coordinatorId, expectedCoordinatorId);
assertEquals(state, "Ready");
assertEquals(sequenceId, 0);
assertEquals(lowWaterMark, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
*/
package org.apache.pulsar.client.admin;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;

public interface Transactions {

Expand All @@ -35,8 +37,17 @@ public interface Transactions {
/**
* Get transaction metadataStore status.
*
* @return the list future of transaction metadata store status.
* @return the map future of transaction metadata store status.
*/
CompletableFuture<Map<Integer, TransactionCoordinatorStatus>> getCoordinatorStatus();

/**
* Get transaction in buffer stats.
*
* @param txnID the txnId
* @param topic the produce topic
* @return the future stats of transaction in buffer.
*/
CompletableFuture<List<TransactionCoordinatorStatus>> getCoordinatorStatusList();
CompletableFuture<TransactionInBufferStats> getTransactionInBufferStats(TxnID txnID, String topic);

}
Loading

0 comments on commit 3e42503

Please sign in to comment.