Skip to content

Commit

Permalink
[improve][broker] Make some operation messageTTL methods in Namespace…
Browse files Browse the repository at this point in the history
…s async. (apache#15577)
  • Loading branch information
shibd authored May 18, 2022
1 parent ccd99fd commit 7c50674
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -791,16 +791,18 @@ protected void internalSetNamespaceReplicationClusters(List<String> clusterIds)
});
}

protected void internalSetNamespaceMessageTTL(Integer messageTTL) {
validateNamespacePolicyOperation(namespaceName, PolicyName.TTL, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
if (messageTTL != null && messageTTL < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
}
updatePolicies(namespaceName, policies -> {
policies.message_ttl_in_seconds = messageTTL;
return policies;
});
protected CompletableFuture<Void> internalSetNamespaceMessageTTLAsync(Integer messageTTL) {
return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.TTL, PolicyOperation.WRITE)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenAccept(__ -> {
if (messageTTL != null && messageTTL < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"Invalid value for message TTL, message TTL must >= 0");
}
}).thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.message_ttl_in_seconds = messageTTL;
return policies;
}));
}

protected void internalSetSubscriptionExpirationTime(Integer expirationTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,13 +390,20 @@ public void setNamespaceReplicationClusters(@PathParam("property") String proper
@ApiOperation(hidden = true, value = "Get the message TTL for the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public Integer getNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
validateNamespacePolicyOperation(NamespaceName.get(property, namespace), PolicyName.TTL, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.message_ttl_in_seconds;
validateNamespacePolicyOperationAsync(NamespaceName.get(property, namespace), PolicyName.TTL,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.message_ttl_in_seconds))
.exceptionally(ex -> {
log.error("Failed to get namespace message TTL for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -405,22 +412,37 @@ public Integer getNamespaceMessageTTL(@PathParam("property") String property, @P
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid TTL") })
public void setNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, int messageTTL) {
public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
int messageTTL) {
validateNamespaceName(property, cluster, namespace);
internalSetNamespaceMessageTTL(messageTTL);
internalSetNamespaceMessageTTLAsync(messageTTL)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to set namespace message TTL for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{property}/{cluster}/{namespace}/messageTTL")
@ApiOperation(value = "Set message TTL in seconds for namespace")
@ApiOperation(value = "Remove message TTL in seconds for namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid TTL") })
public void removeNamespaceMessageTTL(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse,
@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
internalSetNamespaceMessageTTL(null);
internalSetNamespaceMessageTTLAsync(null)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to remove namespace message TTL for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,18 @@ public void setNamespaceReplicationClusters(@PathParam("tenant") String tenant,
@ApiOperation(value = "Get the message TTL for the namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
public Integer getNamespaceMessageTTL(@PathParam("tenant") String tenant,
public void getNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
validateNamespacePolicyOperation(NamespaceName.get(tenant, namespace), PolicyName.TTL, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
return policies.message_ttl_in_seconds;
validateNamespacePolicyOperationAsync(NamespaceName.get(tenant, namespace), PolicyName.TTL,
PolicyOperation.READ)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.message_ttl_in_seconds))
.exceptionally(ex -> {
log.error("Failed to get namespace message TTL for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -348,22 +353,37 @@ public Integer getNamespaceMessageTTL(@PathParam("tenant") String tenant,
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid TTL") })
public void setNamespaceMessageTTL(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "TTL in seconds for the specified namespace", required = true) int messageTTL) {
public void setNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "TTL in seconds for the specified namespace", required = true)
int messageTTL) {
validateNamespaceName(tenant, namespace);
internalSetNamespaceMessageTTL(messageTTL);
internalSetNamespaceMessageTTLAsync(messageTTL)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to set namespace message TTL for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/messageTTL")
@ApiOperation(value = "Set message TTL in seconds for namespace")
@ApiOperation(value = "Remove message TTL in seconds for namespace")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 412, message = "Invalid TTL")})
public void removeNamespaceMessageTTL(@PathParam("tenant") String tenant,
public void removeNamespaceMessageTTL(@Suspended AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetNamespaceMessageTTL(null);
internalSetNamespaceMessageTTLAsync(null)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("Failed to remove namespace message TTL for namespace {}", namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -1325,6 +1326,33 @@ public void close() {
}
}

@Test
public void testOperationNamespaceMessageTTL() throws Exception {
String namespace = "ttlnamespace";

asyncRequests(response -> namespaces.createNamespace(response, this.testTenant, this.testLocalCluster,
namespace, BundlesData.builder().build()));

asyncRequests(response -> namespaces.setNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster,
namespace, 100));

int namespaceMessageTTL = (Integer) asyncRequests(response -> namespaces.getNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster,
namespace));
assertEquals(100, namespaceMessageTTL);

asyncRequests(response -> namespaces.removeNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster, namespace));
assertNull(asyncRequests(response -> namespaces.getNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster,
namespace)));

try {
asyncRequests(response -> namespaces.setNamespaceMessageTTL(response, this.testTenant, this.testLocalCluster,
namespace, -1));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
}

@Test
public void testSetOffloadThreshold() throws Exception {
TopicName topicName = TopicName.get("persistent", this.testTenant, "offload", "offload-topic");
Expand Down

0 comments on commit 7c50674

Please sign in to comment.