Skip to content

Commit

Permalink
[Issue 14013][broker]make offloadStatus method async (apache#14029)
Browse files Browse the repository at this point in the history
  • Loading branch information
HQebupt authored Feb 8, 2022
1 parent ee36db2 commit d58b7e9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3772,12 +3772,28 @@ protected void internalTriggerOffload(AsyncResponse asyncResponse,
});
}

protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) {
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.OFFLOAD);

PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
return topic.offloadStatus();
protected void internalOffloadStatus(AsyncResponse asyncResponse, boolean authoritative) {
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.OFFLOAD))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
OffloadProcessStatus offloadProcessStatus = ((PersistentTopic) topic).offloadStatus();
asyncResponse.resume(offloadProcessStatus);
}).exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof WebApplicationException
&& ((WebApplicationException) cause).getResponse().getStatus()
== Status.TEMPORARY_REDIRECT.getStatusCode()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to offload status on topic {},"
+ " redirecting to other brokers.", clientAppId(), topicName, cause);
}
} else {
log.error("[{}] Failed to offload status on topic {}", clientAppId(), topicName, cause);
}
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
Expand Down Expand Up @@ -815,14 +814,20 @@ public void triggerOffload(@Suspended final AsyncResponse asyncResponse,
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 405, message = "Operation not allowed on persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist")})
public OffloadProcessStatus offloadStatus(@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false")
boolean authoritative) {
validateTopicName(tenant, cluster, namespace, encodedTopic);
return internalOffloadStatus(authoritative);
public void offloadStatus(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateTopicName(tenant, cluster, namespace, encodedTopic);
internalOffloadStatus(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand Down Expand Up @@ -2817,7 +2816,8 @@ public void triggerOffload(
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
public OffloadProcessStatus offloadStatus(
public void offloadStatus(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
Expand All @@ -2826,8 +2826,14 @@ public OffloadProcessStatus offloadStatus(
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
return internalOffloadStatus(authoritative);
try {
validateTopicName(tenant, namespace, encodedTopic);
internalOffloadStatus(asyncResponse, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
Expand Down

0 comments on commit d58b7e9

Please sign in to comment.