Skip to content

Commit

Permalink
[improve][broker] Make some operation auto topic creation in Namespac…
Browse files Browse the repository at this point in the history
…es async. (apache#15621)
  • Loading branch information
shibd authored May 26, 2022
1 parent 34673dd commit f61adc0
Show file tree
Hide file tree
Showing 4 changed files with 174 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -819,56 +819,43 @@ protected void internalSetSubscriptionExpirationTime(Integer expirationTime) {
});
}

protected AutoTopicCreationOverride internalGetAutoTopicCreation() {
validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.READ);
Policies policies = getNamespacePolicies(namespaceName);
return policies.autoTopicCreationOverride;
protected CompletableFuture<AutoTopicCreationOverride> internalGetAutoTopicCreationAsync() {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.AUTO_TOPIC_CREATION,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenApply(policies -> policies.autoTopicCreationOverride);
}

protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse,
AutoTopicCreationOverride autoTopicCreationOverride) {
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
validateNamespacePolicyOperation(namespaceName, PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (autoTopicCreationOverride != null) {
ValidateResult validateResult = AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride);
if (!validateResult.isSuccess()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Invalid configuration for autoTopicCreationOverride. the detail is "
+ validateResult.getErrorInfo());
}
if (Objects.equals(autoTopicCreationOverride.getTopicType(), TopicType.PARTITIONED.toString())) {
if (maxPartitions > 0 && autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
}
}
}
// Force to read the data s.t. the watch to the cache content is setup.
namespaceResources().setPoliciesAsync(namespaceName, policies -> {
policies.autoTopicCreationOverride = autoTopicCreationOverride;
return policies;
}).thenApply(r -> {
String autoOverride = (autoTopicCreationOverride != null
&& autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(),
autoOverride != null ? autoOverride : "removed", namespaceName);
asyncResponse.resume(Response.noContent().build());
return null;
}).exceptionally(e -> {
log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName,
e.getCause());
if (e.getCause() instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
return null;
}
asyncResponse.resume(new RestException(e.getCause()));
return null;
});
}
protected CompletableFuture<Void> internalSetAutoTopicCreationAsync(
AutoTopicCreationOverride autoTopicCreationOverride) {
return validateNamespacePolicyOperationAsync(namespaceName,
PolicyName.AUTO_TOPIC_CREATION, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> {
int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
if (autoTopicCreationOverride != null) {
ValidateResult validateResult =
AutoTopicCreationOverrideImpl.validateOverride(autoTopicCreationOverride);
if (!validateResult.isSuccess()) {
throw new RestException(Status.PRECONDITION_FAILED,
"Invalid configuration for autoTopicCreationOverride. the detail is "
+ validateResult.getErrorInfo());
}
if (Objects.equals(autoTopicCreationOverride.getTopicType(),
TopicType.PARTITIONED.toString())){
if (maxPartitions > 0
&& autoTopicCreationOverride.getDefaultNumPartitions() > maxPartitions) {
throw new RestException(Status.NOT_ACCEPTABLE,
"Number of partitions should be less than or equal to " + maxPartitions);
}

protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
internalSetAutoTopicCreation(asyncResponse, null);
}
}
})
.thenCompose(__ -> namespaceResources().setPoliciesAsync(namespaceName, policies -> {
policies.autoTopicCreationOverride = autoTopicCreationOverride;
return policies;
}));
}

protected void internalSetAutoSubscriptionCreation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,11 +553,18 @@ public void modifyDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar
@ApiOperation(value = "Get autoTopicCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist")})
public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetAutoTopicCreation();
internalGetAutoTopicCreationAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -572,14 +579,27 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
AutoTopicCreationOverride autoTopicCreationOverride) {
try {
validateNamespaceName(property, cluster, namespace);
internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(property, cluster, namespace);
internalSetAutoTopicCreationAsync(autoTopicCreationOverride)
.thenAccept(__ -> {
String autoOverride = (autoTopicCreationOverride != null
&& autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(),
autoOverride, namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(),
namespaceName,
ex);
if (ex instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@DELETE
Expand All @@ -590,14 +610,25 @@ public void setAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
try {
validateNamespaceName(property, cluster, namespace);
internalRemoveAutoTopicCreation(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
internalSetAutoTopicCreationAsync(null)
.thenAccept(__ -> {
log.info("[{}] Successfully remove autoTopicCreation on namespace {}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(),
namespaceName,
ex);
if (ex instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar
validateNamespaceName(tenant, namespace);
internalModifyDeduplicationAsync(null)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("Failed to remove broker deduplication config for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
Expand All @@ -485,10 +486,17 @@ public void removeDeduplication(@Suspended AsyncResponse asyncResponse, @PathPar
@ApiOperation(value = "Get autoTopicCreation info in a namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or namespace doesn't exist")})
public AutoTopicCreationOverride getAutoTopicCreation(@PathParam("tenant") String tenant,
public void getAutoTopicCreation(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetAutoTopicCreation();
internalGetAutoTopicCreationAsync()
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
log.error("Failed to get autoTopicCreation info for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -504,14 +512,27 @@ public void setAutoTopicCreation(
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Settings for automatic topic creation", required = true)
AutoTopicCreationOverride autoTopicCreationOverride) {
try {
validateNamespaceName(tenant, namespace);
internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(tenant, namespace);
internalSetAutoTopicCreationAsync(autoTopicCreationOverride)
.thenAccept(__ -> {
String autoOverride = (autoTopicCreationOverride != null
&& autoTopicCreationOverride.isAllowAutoTopicCreation()) ? "enabled" : "disabled";
log.info("[{}] Successfully {} autoTopicCreation on namespace {}", clientAppId(),
autoOverride, namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to set autoTopicCreation status on namespace {}", clientAppId(),
namespaceName,
ex);
if (ex instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@DELETE
Expand All @@ -521,14 +542,25 @@ public void setAutoTopicCreation(
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
try {
validateNamespaceName(tenant, namespace);
internalRemoveAutoTopicCreation(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(tenant, namespace);
internalSetAutoTopicCreationAsync(null)
.thenAccept(__ -> {
log.info("[{}] Successfully remove autoTopicCreation on namespace {}",
clientAppId(), namespaceName);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(e -> {
Throwable ex = FutureUtil.unwrapCompletionException(e);
log.error("[{}] Failed to remove autoTopicCreation status on namespace {}", clientAppId(),
namespaceName,
ex);
if (ex instanceof MetadataStoreException.NotFoundException) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, "Namespace does not exist"));
} else {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
return null;
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
Expand Down Expand Up @@ -1677,6 +1678,44 @@ public void testRetentionPolicyValidationAsPartOfAllPolicies() throws Exception
assertInvalidRetentionPolicyAsPartOfAllPolicies(policies, 1, -2);
}

@Test
public void testOptionsAutoTopicCreation() throws Exception {
String namespace = "auto_topic_namespace";
AutoTopicCreationOverride autoTopicCreationOverride =
AutoTopicCreationOverride.builder().allowAutoTopicCreation(true).topicType("partitioned")
.defaultNumPartitions(4).build();
try {
asyncRequests(response -> namespaces.setAutoTopicCreation(response, this.testTenant, this.testLocalCluster,
namespace, autoTopicCreationOverride));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}

asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, this.testLocalCluster,
namespace, BundlesData.builder().build()));

// 1. set auto topic creation
asyncRequests(response -> namespaces.setAutoTopicCreation(response, this.testTenant, this.testLocalCluster,
namespace, autoTopicCreationOverride));

// 2. assert get auto topic creation
AutoTopicCreationOverride autoTopicCreationOverrideRsp = (AutoTopicCreationOverride) asyncRequests(
response -> namespaces.getAutoTopicCreation(response, this.testTenant, this.testLocalCluster,
namespace));
assertEquals(autoTopicCreationOverride.getTopicType(), autoTopicCreationOverrideRsp.getTopicType());
assertEquals(autoTopicCreationOverride.getDefaultNumPartitions(),
autoTopicCreationOverrideRsp.getDefaultNumPartitions());
assertEquals(autoTopicCreationOverride.isAllowAutoTopicCreation(),
autoTopicCreationOverrideRsp.isAllowAutoTopicCreation());
// 2. remove auto topic creation and assert get null
asyncRequests(response -> namespaces.removeAutoTopicCreation(response, this.testTenant,
this.testLocalCluster, namespace));
assertNull(asyncRequests(
response -> namespaces.getAutoTopicCreation(response, this.testTenant, this.testLocalCluster,
namespace)));
}

@Test
public void testSubscriptionTypesEnabled() throws PulsarAdminException, PulsarClientException {
pulsar.getConfiguration().setAuthorizationEnabled(false);
Expand Down

0 comments on commit f61adc0

Please sign in to comment.