diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java index 493a4ba307343..2de4c12d8f7c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java @@ -18,14 +18,17 @@ */ package org.apache.pulsar.broker.admin.impl; +import static javax.ws.rs.core.Response.Status.NOT_FOUND; import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import static javax.ws.rs.core.Response.Status.TEMPORARY_REDIRECT; import com.google.common.collect.Lists; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; @@ -34,12 +37,18 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.Transactions; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; +import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; +import org.apache.pulsar.common.policies.data.TransactionMetadata; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException; @Slf4j public abstract class TransactionsBase extends AdminResource { @@ -172,4 +181,109 @@ protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, "This Broker is not configured with transactionCoordinatorEnabled=true.")); } } + + protected void internalGetTransactionMetadata(AsyncResponse asyncResponse, + boolean authoritative, int coordinatorId, long sequenceID) { + try { + if (pulsar().getConfig().isTransactionCoordinatorEnabled()) { + validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), + authoritative); + Transactions transactions = pulsar().getAdminClient().transactions(); + TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService() + .getTxnMeta(new TxnID(coordinatorId, sequenceID)).get(); + TxnID txnID = txnMeta.id(); + TransactionMetadata transactionMetadata = new TransactionMetadata(); + transactionMetadata.txnId = txnID.toString(); + transactionMetadata.status = txnMeta.status().name(); + transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp(); + transactionMetadata.timeoutAt = txnMeta.getTimeoutAt(); + + List> ackedPartitionsFutures = new ArrayList<>(); + Map>> ackFutures = new HashMap<>(); + txnMeta.ackedPartitions().forEach(transactionSubscription -> { + String topic = transactionSubscription.getTopic(); + String subName = transactionSubscription.getSubscription(); + CompletableFuture future = + transactions.getTransactionInPendingAckStats(txnID, topic, subName); + ackedPartitionsFutures.add(future); + if (ackFutures.containsKey(topic)) { + ackFutures.get(topic) + .put(transactionSubscription.getSubscription(), future); + } else { + Map> pendingAckStatsMap = + new HashMap<>(); + pendingAckStatsMap.put(transactionSubscription.getSubscription(), future); + ackFutures.put(topic, pendingAckStatsMap); + } + }); + + List> producedPartitionsFutures = new ArrayList<>(); + Map> produceFutures = new HashMap<>(); + txnMeta.producedPartitions().forEach(topic -> { + CompletableFuture future = + transactions.getTransactionInBufferStats(txnID, topic); + producedPartitionsFutures.add(future); + produceFutures.put(topic, future); + + }); + + FutureUtil.waitForAll(ackedPartitionsFutures).whenComplete((v, e) -> { + if (e != null) { + asyncResponse.resume(new RestException(e)); + return; + } + + FutureUtil.waitForAll(producedPartitionsFutures).whenComplete((x, t) -> { + if (t != null) { + asyncResponse.resume(new RestException(t)); + return; + } + + Map> ackedPartitions = new HashMap<>(); + Map producedPartitions = new HashMap<>(); + + for (String topic : ackFutures.keySet()) { + Map subs = new HashMap<>(); + for (String sub : ackFutures.get(topic).keySet()) { + try { + subs.put(sub, ackFutures.get(topic).get(sub).get()); + } catch (Exception exception) { + asyncResponse.resume(new RestException(exception.getCause())); + return; + } + } + + ackedPartitions.put(topic, subs); + } + + for (String topic : produceFutures.keySet()) { + try { + producedPartitions.put(topic, produceFutures.get(topic).get()); + } catch (Exception exception) { + asyncResponse.resume(new RestException(exception.getCause())); + return; + } + } + transactionMetadata.ackedPartitions = ackedPartitions; + transactionMetadata.producedPartitions = producedPartitions; + asyncResponse.resume(transactionMetadata); + }); + }); + } else { + asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, + "This Broker is not configured with transactionCoordinatorEnabled=true.")); + } + } catch (Exception e) { + if (e instanceof ExecutionException) { + if (e.getCause() instanceof CoordinatorNotFoundException + || e.getCause() instanceof TransactionNotFoundException) { + asyncResponse.resume(new RestException(NOT_FOUND, e.getCause())); + return; + } + asyncResponse.resume(new RestException(e.getCause())); + } else { + asyncResponse.resume(new RestException(e)); + } + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java index 7addbcbd7ad86..a926c89d9ae80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java @@ -103,4 +103,27 @@ public void getTransactionInPendingAckStats(@Suspended final AsyncResponse async internalGetTransactionInPendingAckStats(asyncResponse, authoritative, mostSigBits, leastSigBits, topic, subName); } + + @GET + @Path("/transactionMetadata") + @ApiOperation(value = "Get transaction metadata") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic " + + "or coordinator or transaction doesn't exist"), + @ApiResponse(code = 503, message = "This Broker is not configured " + + "with transactionCoordinatorEnabled=true."), + @ApiResponse(code = 307, message = "Topic don't owner by this broker!"), + @ApiResponse(code = 501, message = "Topic is not a persistent topic!"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void getTransactionMetadata(@Suspended final AsyncResponse asyncResponse, + @QueryParam("authoritative") + @DefaultValue("false") boolean authoritative, + @QueryParam("mostSigBits") + @ApiParam(value = "Most sig bits of this transaction", required = true) + int mostSigBits, + @ApiParam(value = "Least sig bits of this transaction", required = true) + @QueryParam("leastSigBits") long leastSigBits) { + internalGetTransactionMetadata(asyncResponse, authoritative, mostSigBits, leastSigBits); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index bbef370100198..16b6f178d0676 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -38,6 +38,7 @@ import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; +import org.apache.pulsar.common.policies.data.TransactionMetadata; import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; @@ -162,6 +163,77 @@ public void testGetTransactionPendingAckStats() throws Exception { batchMessageId.getBatchIndex()); } + @Test(timeOut = 20000) + public void testGetTransactionMetadata() throws Exception { + initTransaction(2); + long currentTime = System.currentTimeMillis(); + TransactionImpl transaction = (TransactionImpl) getTransaction(); + final String topic1 = "persistent://public/default/testGetTransactionMetadata-1"; + final String subName1 = "test1"; + final String topic2 = "persistent://public/default/testGetTransactionMetadata-2"; + final String subName2 = "test2"; + final String subName3 = "test3"; + admin.topics().createNonPartitionedTopic(topic1); + admin.topics().createNonPartitionedTopic(topic2); + + Producer producer1 = pulsarClient.newProducer(Schema.BYTES) + .sendTimeout(0, TimeUnit.SECONDS).topic(topic1).create(); + + Producer producer2 = pulsarClient.newProducer(Schema.BYTES) + .sendTimeout(0, TimeUnit.SECONDS).topic(topic2).create(); + + Consumer consumer1 = pulsarClient.newConsumer(Schema.BYTES).topic(topic1) + .subscriptionName(subName1).subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer(Schema.BYTES).topic(topic2) + .subscriptionName(subName2).subscribe(); + + Consumer consumer3 = pulsarClient.newConsumer(Schema.BYTES).topic(topic2) + .subscriptionName(subName3).subscribe(); + + MessageId messageId1 = producer1.send("Hello pulsar!".getBytes()); + MessageId messageId2 = producer2.send("Hello pulsar!".getBytes()); + MessageId messageId3 = producer1.newMessage(transaction).value("Hello pulsar!".getBytes()).send(); + MessageId messageId4 = producer2.newMessage(transaction).value("Hello pulsar!".getBytes()).send(); + + consumer1.acknowledgeCumulativeAsync(messageId1, transaction).get(); + consumer2.acknowledgeCumulativeAsync(messageId2, transaction).get(); + consumer3.acknowledgeCumulativeAsync(messageId2, transaction).get(); + TxnID txnID = new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits()); + TransactionMetadata transactionMetadata = admin.transactions() + .getTransactionMetadata(new TxnID(transaction.getTxnIdMostBits(), + transaction.getTxnIdLeastBits())).get(); + + assertEquals(transactionMetadata.txnId, txnID.toString()); + assertTrue(transactionMetadata.openTimestamp > currentTime); + assertEquals(transactionMetadata.timeoutAt, 5000L); + assertEquals(transactionMetadata.status, "OPEN"); + + Map producedPartitions = transactionMetadata.producedPartitions; + Map> ackedPartitions = transactionMetadata.ackedPartitions; + + PositionImpl position1 = getPositionByMessageId(messageId1); + PositionImpl position2 = getPositionByMessageId(messageId2); + PositionImpl position3 = getPositionByMessageId(messageId3); + PositionImpl position4 = getPositionByMessageId(messageId4); + + assertFalse(producedPartitions.get(topic1).aborted); + assertFalse(producedPartitions.get(topic2).aborted); + assertEquals(producedPartitions.get(topic1).startPosition, position3.toString()); + assertEquals(producedPartitions.get(topic2).startPosition, position4.toString()); + + assertEquals(ackedPartitions.get(topic1).size(), 1); + assertEquals(ackedPartitions.get(topic2).size(), 2); + assertEquals(ackedPartitions.get(topic1).get(subName1).cumulativeAckPosition, position1.toString()); + assertEquals(ackedPartitions.get(topic2).get(subName2).cumulativeAckPosition, position2.toString()); + assertEquals(ackedPartitions.get(topic2).get(subName3).cumulativeAckPosition, position2.toString()); + + } + + private static PositionImpl getPositionByMessageId(MessageId messageId) { + return PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId()); + } + private static void verifyCoordinatorStatus(String state, long sequenceId, long lowWaterMark) { assertEquals(state, "Ready"); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java index d3d8f2fc51361..0ee19467ac24f 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java @@ -24,6 +24,7 @@ import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; +import org.apache.pulsar.common.policies.data.TransactionMetadata; public interface Transactions { @@ -62,4 +63,12 @@ public interface Transactions { CompletableFuture getTransactionInPendingAckStats(TxnID txnID, String topic, String subName); + /** + * Get transaction metadata. + * + * @param txnID the ID of this transaction + * @return the future metadata of this transaction. + */ + CompletableFuture getTransactionMetadata(TxnID txnID); + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java index e66089cf929b1..d55b3c8e09047 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java @@ -28,6 +28,7 @@ import org.apache.pulsar.common.policies.data.TransactionCoordinatorStatus; import org.apache.pulsar.common.policies.data.TransactionInBufferStats; import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; +import org.apache.pulsar.common.policies.data.TransactionMetadata; public class TransactionsImpl extends BaseResource implements Transactions { private final WebTarget adminV3Transactions; @@ -122,4 +123,25 @@ public void failed(Throwable throwable) { return future; } + @Override + public CompletableFuture getTransactionMetadata(TxnID txnID) { + WebTarget path = adminV3Transactions.path("transactionMetadata"); + path = path.queryParam("mostSigBits", txnID.getMostSigBits()); + path = path.queryParam("leastSigBits", txnID.getLeastSigBits()); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(TransactionMetadata status) { + future.complete(status); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 8ce5b360cdb31..b13c884bbe8dc 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -35,7 +35,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; -import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -1435,6 +1434,11 @@ void transactions() throws Exception { cmdTransactions.run(split("transaction-in-pending-ack-stats -m 1 -l 2 -t test -s test")); verify(transactions).getTransactionInPendingAckStats( new TxnID(1, 2), "test", "test"); + + cmdTransactions = new CmdTransactions(() -> admin); + cmdTransactions.run(split("transaction-metadata -m 1 -l 2")); + verify(transactions).getTransactionMetadata(new TxnID(1, 2)); + } String[] split(String s) { diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java index 1b0cd5c9cc4bb..763cf84cc9d91 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java @@ -81,10 +81,25 @@ void run() throws Exception { } } + @Parameters(commandDescription = "Get transaction metadata") + private class GetTransactionMetadata extends CliCommand { + @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true) + private int mostSigBits; + + @Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true) + private long leastSigBits; + + @Override + void run() throws Exception { + print(getAdmin().transactions().getTransactionMetadata(new TxnID(mostSigBits, leastSigBits))); + } + } + public CmdTransactions(Supplier admin) { super("transactions", admin); jcommander.addCommand("coordinator-status", new GetCoordinatorStatus()); jcommander.addCommand("transaction-in-buffer-stats", new GetTransactionInBufferStats()); jcommander.addCommand("transaction-in-pending-ack-stats", new GetTransactionInPendingAckStats()); + jcommander.addCommand("transaction-metadata", new GetTransactionMetadata()); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java new file mode 100644 index 0000000000000..fd2d27fcc440e --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TransactionMetadata.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data; + +import lombok.Data; +import java.util.Map; + +@Data +public class TransactionMetadata { + + /** The txnId of this transaction. */ + public String txnId; + + /** The status of this transaction. */ + public String status; + + /** The open time of this transaction. */ + public long openTimestamp; + + /** The timeout of this transaction. */ + public long timeoutAt; + + /** The producedPartitions of this transaction. */ + public Map producedPartitions; + + /** The ackedPartitions of this transaction. */ + public Map> ackedPartitions; +}