Skip to content

Commit

Permalink
Add support for remove offload policy in the namespace level (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Renkai authored Nov 6, 2020
1 parent d7bac05 commit 24cd557
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
Expand Down Expand Up @@ -3000,6 +3001,46 @@ protected void internalSetOffloadPolicies(AsyncResponse asyncResponse, OffloadPo
}
}

protected void internalRemoveOffloadPolicies(AsyncResponse asyncResponse) {
validateNamespacePolicyOperation(namespaceName, PolicyName.OFFLOAD, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);

policies.offload_policies = null;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion(),
(rc, path1, ctx, stat) -> {
if (rc == KeeperException.Code.OK.intValue()) {
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully remove offload configuration: namespace={}", clientAppId(),
namespaceName);
asyncResponse.resume(Response.noContent().build());
} else {
String errorMsg = String.format(
"[%s] Failed to remove offload configuration for namespace %s",
clientAppId(), namespaceName);
if (rc == KeeperException.Code.NONODE.intValue()) {
log.warn("{} : does not exist", errorMsg);
asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
} else if (rc == KeeperException.Code.BADVERSION.intValue()) {
log.warn("{} : concurrent modification", errorMsg);
asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
} else {
asyncResponse.resume(KeeperException.create(KeeperException.Code.get(rc), errorMsg));
}
}
}, null);
} catch (Exception e) {
log.error("[{}] Failed to remove offload configuration for namespace {}", clientAppId(), namespaceName,
e);
asyncResponse.resume(new RestException(e));
}
}

private void validateOffloadPolicies(OffloadPolicies offloadPolicies) {
if (offloadPolicies == null) {
log.warn("[{}] Failed to update offload configuration for namespace {}: offloadPolicies is null",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1322,8 +1322,8 @@ public void setSchemaValidtionEnforced(@PathParam("tenant") String tenant,
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "OffloadPolicies is empty or driver is not supported or bucket is not valid") })
public void setOffloadPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@ApiParam(value = "Offload policies for the specified namespace", required = true) OffloadPolicies offload,
@Suspended final AsyncResponse asyncResponse) {
@ApiParam(value = "Offload policies for the specified namespace", required = true) OffloadPolicies offload,
@Suspended final AsyncResponse asyncResponse) {
try {
validateNamespaceName(tenant, namespace);
internalSetOffloadPolicies(asyncResponse, offload);
Expand All @@ -1334,14 +1334,34 @@ public void setOffloadPolicies(@PathParam("tenant") String tenant, @PathParam("n
}
}

@DELETE
@Path("/{tenant}/{namespace}/removeOffloadPolicies")
@ApiOperation(value = " Set offload configuration on a namespace.")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 412, message = "OffloadPolicies is empty or driver is not supported or bucket is not valid")})
public void removeOffloadPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@Suspended final AsyncResponse asyncResponse) {
try {
validateNamespaceName(tenant, namespace);
internalRemoveOffloadPolicies(asyncResponse);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@GET
@Path("/{tenant}/{namespace}/offloadPolicies")
@ApiOperation(value = "Get offload configuration on a namespace.")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
@ApiResponse(code = 404, message = "Namespace does not exist")})
public OffloadPolicies getOffloadPolicies(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetOffloadPolicies();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ public void testOffloadPolicies() throws Exception {
admin.namespaces().setOffloadPolicies(namespaceName, offload1);
OffloadPolicies offload2 = admin.namespaces().getOffloadPolicies(namespaceName);
assertEquals(offload1, offload2);

admin.namespaces().removeOffloadPolicies(namespaceName);
OffloadPolicies offload3 = admin.namespaces().getOffloadPolicies(namespaceName);
assertNull(offload3);
}

@Test(timeOut = 20000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3292,6 +3292,20 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem
*/
void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies) throws PulsarAdminException;

/**
* Remove the offload configuration for a namespace.
* <p/>
* Remove the offload configuration in a namespace. This operation requires pulsar tenant access.
* <p/>
*
* @param namespace Namespace name
* @throws NotAuthorizedException Don't have admin permission
* @throws NotFoundException Namespace does not exist
* @throws ConflictException Concurrent modification
* @throws PulsarAdminException Unexpected error
*/
void removeOffloadPolicies(String namespace) throws PulsarAdminException;

/**
* Set the offload configuration for all the topics in a namespace asynchronously.
* <p/>
Expand Down Expand Up @@ -3319,6 +3333,20 @@ void setIsAllowAutoUpdateSchema(String namespace, boolean isAllowAutoUpdateSchem
*/
CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies);

/**
* Remove the offload configuration for a namespace asynchronously.
* <p/>
* Remove the offload configuration in a namespace. This operation requires pulsar tenant access.
* <p/>
*
* @param namespace Namespace name
* @throws NotAuthorizedException Don't have admin permission
* @throws NotFoundException Namespace does not exist
* @throws ConflictException Concurrent modification
* @throws PulsarAdminException Unexpected error
*/
CompletableFuture<Void> removeOffloadPoliciesAsync(String namespace);

/**
* Get the offload configuration for a namespace.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2675,13 +2675,35 @@ public void setOffloadPolicies(String namespace, OffloadPolicies offloadPolicies
}
}

@Override
public void removeOffloadPolicies(String namespace) throws PulsarAdminException {
try {
removeOffloadPoliciesAsync(namespace)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<Void> setOffloadPoliciesAsync(String namespace, OffloadPolicies offloadPolicies) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "offloadPolicies");
return asyncPostRequest(path, Entity.entity(offloadPolicies, MediaType.APPLICATION_JSON));
}

@Override
public CompletableFuture<Void> removeOffloadPoliciesAsync(String namespace) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "removeOffloadPolicies");
return asyncDeleteRequest(path);
}

@Override
public OffloadPolicies getOffloadPolicies(String namespace) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,9 @@ public void namespaces() throws Exception {
"http://test.endpoint", 32 * 1024 * 1024, 5 * 1024 * 1024,
10L * 1024 * 1024, 10000L));

namespaces.run(split("remove-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).removeOffloadPolicies("myprop/clust/ns1");

namespaces.run(split("get-offload-policies myprop/clust/ns1"));
verify(mockNamespaces).getOffloadPolicies("myprop/clust/ns1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1704,6 +1704,20 @@ && maxValueCheck("OffloadAfterThreshold", offloadAfterThreshold, Long.MAX_VALUE)
}
}

@Parameters(commandDescription = "Remove the offload policies for a namespace")
private class RemoveOffloadPolicies extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);

admin.namespaces().removeOffloadPolicies(namespace);
}
}


@Parameters(commandDescription = "Get the offload policies for a namespace")
private class GetOffloadPolicies extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
Expand Down Expand Up @@ -1840,6 +1854,7 @@ public CmdNamespaces(PulsarAdmin admin) {
jcommander.addCommand("set-schema-validation-enforce", new SetSchemaValidationEnforced());

jcommander.addCommand("set-offload-policies", new SetOffloadPolicies());
jcommander.addCommand("remove-offload-policies", new RemoveOffloadPolicies());
jcommander.addCommand("get-offload-policies", new GetOffloadPolicies());
}
}

0 comments on commit 24cd557

Please sign in to comment.