From 132ba4ad8e3808923b2c8b0a9fa5db5a8d413542 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Wed, 18 May 2022 10:05:47 +0800 Subject: [PATCH] [improve][broker] Make some operation deduplication methods in Namespaces async. (#15608) --- .../broker/admin/impl/NamespacesBase.java | 21 ++++++------ .../pulsar/broker/admin/v1/Namespaces.java | 13 ++++++-- .../pulsar/broker/admin/v2/Namespaces.java | 33 +++++++++++++++---- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 39a4eb1306083..7f842ac93fa85 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -906,13 +906,13 @@ protected void internalRemoveAutoSubscriptionCreation(AsyncResponse asyncRespons internalSetAutoSubscriptionCreation(asyncResponse, null); } - protected void internalModifyDeduplication(Boolean enableDeduplication) { - validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE); - validatePoliciesReadOnlyAccess(); - updatePolicies(namespaceName, policies -> { - policies.deduplicationEnabled = enableDeduplication; - return policies; - }); + protected CompletableFuture internalModifyDeduplicationAsync(Boolean enableDeduplication) { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.WRITE) + .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) + .thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> { + policies.deduplicationEnabled = enableDeduplication; + return policies; + })); } @SuppressWarnings("deprecation") @@ -2146,9 +2146,10 @@ protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) { } } - protected Boolean internalGetDeduplication() { - validateNamespacePolicyOperation(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ); - return getNamespacePolicies(namespaceName).deduplicationEnabled; + protected CompletableFuture internalGetDeduplicationAsync() { + return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.DEDUPLICATION, PolicyOperation.READ) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenApply(policies -> policies.deduplicationEnabled); } protected Integer internalGetMaxConsumersPerTopic() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index 0ccc505d5e84b..8fe1e0525c62c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -513,10 +513,17 @@ public void removeNamespaceAntiAffinityGroup(@PathParam("property") String prope @ApiOperation(hidden = true, value = "Enable or disable broker side deduplication for all topics in a namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") }) - public void modifyDeduplication(@PathParam("property") String property, @PathParam("cluster") String cluster, - @PathParam("namespace") String namespace, boolean enableDeduplication) { + public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property, + @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, + boolean enableDeduplication) { validateNamespaceName(property, cluster, namespace); - internalModifyDeduplication(enableDeduplication); + internalModifyDeduplicationAsync(enableDeduplication) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 2769b5b3bc343..a2ef19ee462ea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -411,9 +411,16 @@ public void removeSubscriptionExpirationTime(@PathParam("tenant") String tenant, @ApiOperation(value = "Get broker side deduplication for all topics in a namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { + public void getDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - return internalGetDeduplication(); + internalGetDeduplicationAsync() + .thenAccept(deduplication -> asyncResponse.resume(deduplication)) + .exceptionally(ex -> { + log.error("Failed to get broker deduplication config for namespace {}", namespace, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -421,12 +428,19 @@ public Boolean getDeduplication(@PathParam("tenant") String tenant, @PathParam(" @ApiOperation(value = "Enable or disable broker side deduplication for all topics in a namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, @ApiParam(value = "Flag for disabling or enabling broker side deduplication " + "for all topics in the specified namespace", required = true) boolean enableDeduplication) { validateNamespaceName(tenant, namespace); - internalModifyDeduplication(enableDeduplication); + internalModifyDeduplicationAsync(enableDeduplication) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("Failed to modify broker deduplication config for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @DELETE @@ -434,9 +448,16 @@ public void modifyDeduplication(@PathParam("tenant") String tenant, @PathParam(" @ApiOperation(value = "Remove broker side deduplication for all topics in a namespace") @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") }) - public void removeDeduplication(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) { + public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace) { validateNamespaceName(tenant, namespace); - internalModifyDeduplication(null); + internalModifyDeduplicationAsync(null) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @GET