Skip to content

Commit

Permalink
[Transaction] Fix transaction admin api path path param problem. (apa…
Browse files Browse the repository at this point in the history
…che#10748)

## Motivation
now some transaction admin api get method don't use `uri` path pass param.
  • Loading branch information
congbobo184 authored May 31, 2021
1 parent e19f647 commit 6f2a40b
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
Expand All @@ -54,6 +56,7 @@
import org.apache.pulsar.common.policies.data.TransactionLogStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
Expand Down Expand Up @@ -130,12 +133,11 @@ protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean
}

protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
long mostSigBits, long leastSigBits, String topic,
String subName) {
long mostSigBits, long leastSigBits, String subName) {
if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
validateTopicOwnership(TopicName.get(topic), authoritative);
validateTopicOwnership(topicName, authoritative);
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(TopicName.get(topic).toString());
.getTopics().get(topicName.toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, e) -> {
if (e != null) {
Expand Down Expand Up @@ -165,12 +167,11 @@ protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncRespon
}

protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
long mostSigBits, long leastSigBits,
String topic) {
long mostSigBits, long leastSigBits) {
if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
validateTopicOwnership(TopicName.get(topic), authoritative);
validateTopicOwnership(topicName, authoritative);
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(TopicName.get(topic).toString());
.getTopics().get(topicName.toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, e) -> {
if (e != null) {
Expand Down Expand Up @@ -200,12 +201,11 @@ protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse,
}
}

protected void internalGetTransactionBufferStats(AsyncResponse asyncResponse,
boolean authoritative, String topic) {
protected void internalGetTransactionBufferStats(AsyncResponse asyncResponse, boolean authoritative) {
if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
validateTopicOwnership(TopicName.get(topic), authoritative);
validateTopicOwnership(topicName, authoritative);
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(TopicName.get(topic).toString());
.getTopics().get(topicName.toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, e) -> {
if (e != null) {
Expand Down Expand Up @@ -233,12 +233,11 @@ protected void internalGetTransactionBufferStats(AsyncResponse asyncResponse,
}
}

protected void internalGetPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
String topic, String subName) {
protected void internalGetPendingAckStats(AsyncResponse asyncResponse, boolean authoritative, String subName) {
if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
validateTopicOwnership(TopicName.get(topic), authoritative);
validateTopicOwnership(topicName, authoritative);
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(TopicName.get(topic).toString());
.getTopics().get(topicName.toString());
if (topicFuture != null) {
topicFuture.whenComplete((optionalTopic, e) -> {
if (e != null) {
Expand Down Expand Up @@ -573,4 +572,16 @@ protected void internalGetPendingAckInternalStats(AsyncResponse asyncResponse, b
asyncResponse.resume(new RestException(e.getCause()));
}
}

protected void validateTopicName(String property, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
try {
this.namespaceName = NamespaceName.get(property, namespace);
this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, topic);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to validate topic name {}://{}/{}/{}", clientAppId(), domain(), property, namespace,
topic, e);
throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic name is not valid");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import javax.ws.rs.Consumes;
Expand Down Expand Up @@ -60,7 +59,7 @@ public void getCoordinatorStats(@Suspended final AsyncResponse asyncResponse,
}

@GET
@Path("/transactionInBufferStats")
@Path("/transactionInBufferStats/{tenant}/{namespace}/{topic}/{mostSigBits}/{leastSigBits}")
@ApiOperation(value = "Get transaction state in transaction buffer.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
Expand All @@ -72,18 +71,18 @@ public void getCoordinatorStats(@Suspended final AsyncResponse asyncResponse,
public void getTransactionInBufferStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@QueryParam("mostSigBits")
@ApiParam(value = "Most sig bits of this transaction", required = true)
long mostSigBits,
@ApiParam(value = "Least sig bits of this transaction", required = true)
@QueryParam("leastSigBits") long leastSigBits,
@ApiParam(value = "Topic", required = true)
@QueryParam("topic") String topic) {
internalGetTransactionInBufferStats(asyncResponse, authoritative, mostSigBits, leastSigBits, topic);
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("mostSigBits") String mostSigBits,
@PathParam("leastSigBits") String leastSigBits) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionInBufferStats(asyncResponse, authoritative,
Long.parseLong(mostSigBits), Long.parseLong(leastSigBits));
}

@GET
@Path("/transactionInPendingAckStats")
@Path("/transactionInPendingAckStats/{tenant}/{namespace}/{topic}/{subName}/{mostSigBits}/{leastSigBits}")
@ApiOperation(value = "Get transaction state in pending ack.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
Expand All @@ -95,21 +94,19 @@ public void getTransactionInBufferStats(@Suspended final AsyncResponse asyncResp
public void getTransactionInPendingAckStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@QueryParam("mostSigBits")
@ApiParam(value = "Most sig bits of this transaction", required = true)
long mostSigBits,
@ApiParam(value = "Least sig bits of this transaction", required = true)
@QueryParam("leastSigBits") long leastSigBits,
@ApiParam(value = "Topic name", required = true)
@QueryParam("topic") String topic,
@ApiParam(value = "Subscription name", required = true)
@QueryParam("subName") String subName) {
internalGetTransactionInPendingAckStats(asyncResponse, authoritative, mostSigBits,
leastSigBits, topic, subName);
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("mostSigBits") String mostSigBits,
@PathParam("leastSigBits") String leastSigBits,
@PathParam("subName") String subName) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionInPendingAckStats(asyncResponse, authoritative, Long.parseLong(mostSigBits),
Long.parseLong(leastSigBits), subName);
}

@GET
@Path("/transactionBufferStats")
@Path("/transactionBufferStats/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Get transaction buffer stats in topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
Expand All @@ -121,13 +118,15 @@ public void getTransactionInPendingAckStats(@Suspended final AsyncResponse async
public void getTransactionBufferStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@ApiParam(value = "Topic name", required = true)
@QueryParam("topic") String topic) {
internalGetTransactionBufferStats(asyncResponse, authoritative, topic);
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferStats(asyncResponse, authoritative);
}

@GET
@Path("/pendingAckStats")
@Path("/pendingAckStats/{tenant}/{namespace}/{topic}/{subName}")
@ApiOperation(value = "Get transaction pending ack stats in topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic or subName doesn't exist"),
Expand All @@ -139,15 +138,16 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon
public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@ApiParam(value = "Topic name", required = true)
@QueryParam("topic") String topic,
@ApiParam(value = "Subscription name", required = true)
@QueryParam("subName") String subName) {
internalGetPendingAckStats(asyncResponse, authoritative, topic, subName);
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String subName) {
validateTopicName(tenant, namespace, encodedTopic);
internalGetPendingAckStats(asyncResponse, authoritative, subName);
}

@GET
@Path("/transactionMetadata")
@Path("/transactionMetadata/{mostSigBits}/{leastSigBits}")
@ApiOperation(value = "Get transaction metadata")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
Expand All @@ -160,12 +160,10 @@ public void getPendingAckStats(@Suspended final AsyncResponse asyncResponse,
public void getTransactionMetadata(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@QueryParam("mostSigBits")
@ApiParam(value = "Most sig bits of this transaction", required = true)
int mostSigBits,
@ApiParam(value = "Least sig bits of this transaction", required = true)
@QueryParam("leastSigBits") long leastSigBits) {
internalGetTransactionMetadata(asyncResponse, authoritative, mostSigBits, leastSigBits);
@PathParam("mostSigBits") String mostSigBits,
@PathParam("leastSigBits") String leastSigBits) {
internalGetTransactionMetadata(asyncResponse, authoritative, Integer.parseInt(mostSigBits),
Long.parseLong(leastSigBits));
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,11 @@ public Map<Integer, TransactionCoordinatorStats> getCoordinatorStats() throws Pu

@Override
public CompletableFuture<TransactionInBufferStats> getTransactionInBufferStatsAsync(TxnID txnID, String topic) {
TopicName topicName = TopicName.get(topic);
WebTarget path = adminV3Transactions.path("transactionInBufferStats");
path = path.queryParam("mostSigBits", txnID.getMostSigBits());
path = path.queryParam("leastSigBits", txnID.getLeastSigBits());
path = path.queryParam("topic", topic);
path = path.path(topicName.getRestPath(false));
path = path.path(txnID.getMostSigBits() + "");
path = path.path(txnID.getLeastSigBits() + "");
final CompletableFuture<TransactionInBufferStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<TransactionInBufferStats>() {
Expand Down Expand Up @@ -156,10 +157,10 @@ public CompletableFuture<TransactionInPendingAckStats> getTransactionInPendingAc
String topic,
String subName) {
WebTarget path = adminV3Transactions.path("transactionInPendingAckStats");
path = path.queryParam("mostSigBits", txnID.getMostSigBits());
path = path.queryParam("leastSigBits", txnID.getLeastSigBits());
path = path.queryParam("topic", topic);
path = path.queryParam("subName", subName);
path = path.path(TopicName.get(topic).getRestPath(false));
path = path.path(subName);
path = path.path(txnID.getMostSigBits() + "");
path = path.path(txnID.getLeastSigBits() + "");
final CompletableFuture<TransactionInPendingAckStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<TransactionInPendingAckStats>() {
Expand Down Expand Up @@ -195,8 +196,8 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID,
@Override
public CompletableFuture<TransactionMetadata> getTransactionMetadataAsync(TxnID txnID) {
WebTarget path = adminV3Transactions.path("transactionMetadata");
path = path.queryParam("mostSigBits", txnID.getMostSigBits());
path = path.queryParam("leastSigBits", txnID.getLeastSigBits());
path = path.path(txnID.getMostSigBits() + "");
path = path.path(txnID.getLeastSigBits() + "");
final CompletableFuture<TransactionMetadata> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<TransactionMetadata>() {
Expand Down Expand Up @@ -231,7 +232,7 @@ public TransactionMetadata getTransactionMetadata(TxnID txnID) throws PulsarAdmi
@Override
public CompletableFuture<TransactionBufferStats> getTransactionBufferStatsAsync(String topic) {
WebTarget path = adminV3Transactions.path("transactionBufferStats");
path = path.queryParam("topic", topic);
path = path.path(TopicName.get(topic).getRestPath(false));
final CompletableFuture<TransactionBufferStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<TransactionBufferStats>() {
Expand Down Expand Up @@ -265,8 +266,8 @@ public TransactionBufferStats getTransactionBufferStats(String topic) throws Pul
@Override
public CompletableFuture<TransactionPendingAckStats> getPendingAckStatsAsync(String topic, String subName) {
WebTarget path = adminV3Transactions.path("pendingAckStats");
path = path.queryParam("topic", topic);
path = path.queryParam("subName", subName);
path = path.path(TopicName.get(topic).getRestPath(false));
path = path.path(subName);
final CompletableFuture<TransactionPendingAckStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<TransactionPendingAckStats>() {
Expand Down

0 comments on commit 6f2a40b

Please sign in to comment.