Skip to content

Commit

Permalink
[improve][broker] Make some operation deduplication methods in Namesp…
Browse files Browse the repository at this point in the history
…aces async. (apache#15608)
  • Loading branch information
shibd authored May 18, 2022
1 parent 9ea5fef commit 132ba4a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -906,13 +906,13 @@ protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncRespons
internalSetAutoSubscriptionCreation(asyncResponse, null);
}

protected void internalModifyDeduplication(Boolean enableDeduplication) {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
updatePolicies(namespaceName, policies -> {
policies.deduplicationEnabled = enableDeduplication;
return policies;
});
protected CompletableFuture<Void> internalModifyDeduplicationAsync(Boolean enableDeduplication) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.deduplicationEnabled = enableDeduplication;
return policies;
}));
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -2146,9 +2146,10 @@ protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) {
}
}

protected Boolean internalGetDeduplication() {
validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).deduplicationEnabled;
protected CompletableFuture<Boolean> internalGetDeduplicationAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.deduplicationEnabled);
}

protected Integer internalGetMaxConsumersPerTopic() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,17 @@ public void removeNamespaceAntiAffinityGroup(@PathParam("property") String prope
@ApiOperation(hidden = true, value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, boolean enableDeduplication) {
public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
boolean enableDeduplication) {
validateNamespaceName(property, cluster, namespace);
internalModifyDeduplication(enableDeduplication);
internalModifyDeduplicationAsync(enableDeduplication)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,32 +411,53 @@ public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetDeduplication();
internalGetDeduplicationAsync()
.thenAccept(deduplication -> asyncResponse.resume(deduplication))
.exceptionally(ex -> {
log.error("Failed to get broker deduplication config for namespace {}", namespace, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "Flag for disabling or enabling broker side deduplication "
+ "for all topics in the specified namespace", required = true)
boolean enableDeduplication) {
validateNamespaceName(tenant, namespace);
internalModifyDeduplication(enableDeduplication);
internalModifyDeduplicationAsync(enableDeduplication)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/deduplication")
@ApiOperation(value = "Remove broker side deduplication for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalModifyDeduplication(null);
internalModifyDeduplicationAsync(null)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down

0 comments on commit 132ba4a

Please sign in to comment.