Skip to content

Commit

Permalink
[pulsar-broker] add cli/admin api to delete bookie-affinity group (ap…
Browse files Browse the repository at this point in the history
…ache#4471)

### Motivation

This PR is on top of apache#4458. It adds support to delete bookie-affinity group using cli/admin api.
  • Loading branch information
rdhabalia authored and sijie committed Jun 14, 2019
1 parent 26e69ee commit 6636c79
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,10 @@ protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffi
}
}

protected void internalDeleteBookieAffinityGroup() {
internalSetBookieAffinityGroup(null);
}

protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
validateSuperUserAccess();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,19 @@ public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") Str
validateNamespaceName(property, cluster, namespace);
return internalGetBookieAffinityGroup();
}


@DELETE
@Path("/{property}/{cluster}/{namespace}/persistence/bookieAffinity")
@ApiOperation(hidden = true, value = "Delete the bookie-affinity-group from namespace-local policy.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void deleteBookieAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
internalDeleteBookieAffinityGroup();
}

@GET
@Path("/{property}/{cluster}/{namespace}/persistence")
@ApiOperation(hidden = true, value = "Get the persistence configuration for a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ public void setBookieAffinityGroup(@PathParam("tenant") String tenant, @PathPara

@GET
@Path("/{property}/{namespace}/persistence/bookieAffinity")
@ApiOperation(hidden = true, value = "Get the bookie-affinity-group from namespace-local policy.")
@ApiOperation(value = "Get the bookie-affinity-group from namespace-local policy.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
Expand All @@ -515,7 +515,19 @@ public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") Str
validateNamespaceName(property, namespace);
return internalGetBookieAffinityGroup();
}


@DELETE
@Path("/{property}/{namespace}/persistence/bookieAffinity")
@ApiOperation(value = "Delete the bookie-affinity-group from namespace-local policy.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void deleteBookieAffinityGroup(@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
internalDeleteBookieAffinityGroup();
}

@GET
@Path("/{tenant}/{namespace}/persistence")
@ApiOperation(value = "Get the persistence configuration for a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
Expand Down Expand Up @@ -249,7 +250,6 @@ public void testBookieIsolation() throws Exception {
*/
@Test
public void testBookieIsilationWithSecondaryGroup() throws Exception {

final String tenant1 = "tenant1";
final String cluster = "use";
final String ns1 = String.format("%s/%s/%s", tenant1, cluster, "ns1");
Expand Down Expand Up @@ -374,6 +374,85 @@ public void testBookieIsilationWithSecondaryGroup() throws Exception {
}
}

@Test
public void testDeleteIsolationGroup() throws Exception {

final String tenant1 = "tenant1";
final String cluster = "use";
final String ns2 = String.format("%s/%s/%s", tenant1, cluster, "ns2");
final String ns3 = String.format("%s/%s/%s", tenant1, cluster, "ns3");

final String brokerBookkeeperClientIsolationGroups = "default-group";
final String tenantNamespaceIsolationGroupsPrimary = "tenant1-isolation-primary";
final String tenantNamespaceIsolationGroupsSecondary = "tenant1-isolation=secondary";

BookieServer[] bookies = bkEnsemble.getBookies();
ZooKeeper zkClient = bkEnsemble.getZkClient();

Set<BookieSocketAddress> defaultBookies = Sets.newHashSet(bookies[0].getLocalAddress(),
bookies[1].getLocalAddress());
Set<BookieSocketAddress> isolatedBookies = Sets.newHashSet(bookies[2].getLocalAddress(),
bookies[3].getLocalAddress());

setDefaultIsolationGroup(brokerBookkeeperClientIsolationGroups, zkClient, defaultBookies);
// primary group empty
setDefaultIsolationGroup(tenantNamespaceIsolationGroupsPrimary, zkClient, Sets.newHashSet());
setDefaultIsolationGroup(tenantNamespaceIsolationGroupsSecondary, zkClient, isolatedBookies);

ServiceConfiguration config = new ServiceConfiguration();
config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName());
config.setClusterName(cluster);
config.setWebServicePort(Optional.of(PRIMARY_BROKER_WEBSERVICE_PORT));
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config.setBrokerServicePort(Optional.of(PRIMARY_BROKER_PORT));
config.setAdvertisedAddress("localhost");
config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups);

config.setManagedLedgerDefaultEnsembleSize(2);
config.setManagedLedgerDefaultWriteQuorum(2);
config.setManagedLedgerDefaultAckQuorum(2);

config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
pulsarService = new PulsarService(config);
pulsarService.start();

URL brokerUrl = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT);
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString()).build();

ClusterData clusterData = new ClusterData(pulsarService.getWebServiceAddress());
admin.clusters().createCluster(cluster, clusterData);
TenantInfo tenantInfo = new TenantInfo(null, Sets.newHashSet(cluster));
admin.tenants().createTenant(tenant1, tenantInfo);
admin.namespaces().createNamespace(ns2);
admin.namespaces().createNamespace(ns3);

// (1) set affinity-group
admin.namespaces().setBookieAffinityGroup(ns2, new BookieAffinityGroupData(
tenantNamespaceIsolationGroupsPrimary, tenantNamespaceIsolationGroupsSecondary));
admin.namespaces().setBookieAffinityGroup(ns3, new BookieAffinityGroupData(
tenantNamespaceIsolationGroupsPrimary, tenantNamespaceIsolationGroupsSecondary));

// (2) get affinity-group
assertEquals(admin.namespaces().getBookieAffinityGroup(ns2), new BookieAffinityGroupData(
tenantNamespaceIsolationGroupsPrimary, tenantNamespaceIsolationGroupsSecondary));
assertEquals(admin.namespaces().getBookieAffinityGroup(ns3), new BookieAffinityGroupData(
tenantNamespaceIsolationGroupsPrimary, tenantNamespaceIsolationGroupsSecondary));

// (3) delete affinity-group
admin.namespaces().deleteBookieAffinityGroup(ns2);

try {
admin.namespaces().getBookieAffinityGroup(ns2);
fail("should have fail due to affinity-group not present");
} catch (NotFoundException e) {
// Ok
}

assertEquals(admin.namespaces().getBookieAffinityGroup(ns3), new BookieAffinityGroupData(
tenantNamespaceIsolationGroupsPrimary, tenantNamespaceIsolationGroupsSecondary));

}

private void assertAffinityBookies(LedgerManager ledgerManager, List<LedgerInfo> ledgers1,
Set<BookieSocketAddress> defaultBookies) throws Exception {
for (LedgerInfo lInfo : ledgers1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,13 @@ List<String> getAntiAffinityNamespaces(String tenant, String cluster, String nam
void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffinityGroup)
throws PulsarAdminException;

/**
* Delete bookie affinity group configured for a namespace.
*
* @param namespace
* @throws PulsarAdminException
*/
void deleteBookieAffinityGroup(String namespace) throws PulsarAdminException;

/**
* Get bookie affinity group configured for a namespace.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,17 @@ public void setBookieAffinityGroup(String namespace, BookieAffinityGroupData boo
}
}

@Override
public void deleteBookieAffinityGroup(String namespace) throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "persistence", "bookieAffinity");
request(path).delete(ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public BookieAffinityGroupData getBookieAffinityGroup(String namespace) throws PulsarAdminException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ void namespaces() throws Exception {
namespaces.run(split("get-bookie-affinity-group myprop/clust/ns1"));
verify(mockNamespaces).getBookieAffinityGroup("myprop/clust/ns1");

namespaces.run(split("delete-bookie-affinity-group myprop/clust/ns1"));
verify(mockNamespaces).deleteBookieAffinityGroup("myprop/clust/ns1");

namespaces.run(split("unload myprop/clust/ns1"));
verify(mockNamespaces).unload("myprop/clust/ns1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,19 @@ void run() throws PulsarAdminException {
new BookieAffinityGroupData(bookieAffinityGroupNamePrimary, bookieAffinityGroupNameSecondary));
}
}


@Parameters(commandDescription = "Set the bookie-affinity group name")
private class DeleteBookieAffinityGroup extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);
admin.namespaces().deleteBookieAffinityGroup(namespace);
}
}

@Parameters(commandDescription = "Get the bookie-affinity group name")
private class GetBookieAffinityGroup extends CliCommand {
@Parameter(description = "tenant/namespace\n", required = true)
Expand Down Expand Up @@ -1162,6 +1174,7 @@ public CmdNamespaces(PulsarAdmin admin) {

jcommander.addCommand("set-bookie-affinity-group", new SetBookieAffinityGroup());
jcommander.addCommand("get-bookie-affinity-group", new GetBookieAffinityGroup());
jcommander.addCommand("delete-bookie-affinity-group", new DeleteBookieAffinityGroup());

jcommander.addCommand("unload", new Unload());

Expand Down

0 comments on commit 6636c79

Please sign in to comment.