Skip to content

Commit

Permalink
make namespaces policy update take effect on time (apache#8976)
Browse files Browse the repository at this point in the history
### Motivation

The change of namespaces isolation policy takes effect only when load-manager re-assign the bundles to brokers again. 
This change tries to make the isolation policy takes effect on time.

### Modifications

- change setNamespaceIsolationPolicy method into async.
- add parameter to enable this feature: enableNamespaceIsolationUpdateOnTime.
- add test to cover this feature.

### Verifying this change
tests passed
  • Loading branch information
jiazhai authored Dec 21, 2020
1 parent a292b0a commit 3604c67
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "List of interceptors for entry metadata.")
private Set<String> brokerEntryMetadataInterceptors = new HashSet<>();

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable namespaceIsolation policy update take effect ontime or not," +
" if set to ture, then the related namespaces will be unloaded after reset policy to make it take effect."
)
private boolean enableNamespaceIsolationUpdateOnTime = false;

/***** --- TLS --- ****/
@FieldContext(
category = CATEGORY_TLS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,25 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
Expand All @@ -58,6 +64,7 @@
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicyImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -675,6 +682,7 @@ public BrokerNamespaceIsolationData getBrokerWithNamespaceIsolationPolicy(
@ApiResponse(code = 500, message = "Internal server error.")
})
public void setNamespaceIsolationPolicy(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(
value = "The cluster name",
required = true
Expand All @@ -690,14 +698,16 @@ public void setNamespaceIsolationPolicy(
required = true
)
NamespaceIsolationData policyData
) throws Exception {
) {
validateSuperUserAccess();
validateClusterExists(cluster);
validatePoliciesReadOnlyAccess();

String jsonInput = null;
try {
// validate the policy data before creating the node
policyData.validate();
jsonInput = ObjectMapperFactory.create().writeValueAsString(policyData);

String nsIsolationPolicyPath = path("clusters", cluster, NAMESPACE_ISOLATION_POLICIES);
NamespaceIsolationPolicies nsIsolationPolicies = namespaceIsolationPoliciesCache()
Expand All @@ -715,24 +725,113 @@ public void setNamespaceIsolationPolicy(
-1);
// make sure that the cache content will be refreshed for the next read access
namespaceIsolationPoliciesCache().invalidate(nsIsolationPolicyPath);

// whether or not make the isolation update on time.
if (pulsar().getConfiguration().isEnableNamespaceIsolationUpdateOnTime()) {
filterAndUnloadMatchedNameSpaces(asyncResponse, policyData);
} else {
asyncResponse.resume(Response.noContent().build());
return;
}
} catch (IllegalArgumentException iae) {
log.info("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}. Input data is invalid",
clientAppId(), cluster, policyName, iae);
String jsonInput = ObjectMapperFactory.create().writeValueAsString(policyData);
throw new RestException(Status.BAD_REQUEST,
"Invalid format of input policy data. policy: " + policyName + "; data: " + jsonInput);
asyncResponse.resume(new RestException(Status.BAD_REQUEST,
"Invalid format of input policy data. policy: " + policyName + "; data: " + jsonInput));
} catch (KeeperException.NoNodeException nne) {
log.warn("[{}] Failed to update clusters/{}/namespaceIsolationPolicies: Does not exist", clientAppId(),
cluster);
throw new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist");
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"NamespaceIsolationPolicies for cluster " + cluster + " does not exist"));
} catch (Exception e) {
log.error("[{}] Failed to update clusters/{}/namespaceIsolationPolicies/{}", clientAppId(), cluster,
policyName, e);
throw new RestException(e);
asyncResponse.resume(new RestException(e));
}
}

// get matched namespaces; call unload for each namespaces;
private void filterAndUnloadMatchedNameSpaces(AsyncResponse asyncResponse,
NamespaceIsolationData policyData) throws Exception {
Namespaces namespaces = pulsar().getAdminClient().namespaces();

List<String> nssToUnload = Lists.newArrayList();

pulsar().getAdminClient().tenants().getTenantsAsync()
.whenComplete((tenants, ex) -> {
if (ex != null) {
log.error("[{}] Failed to get tenants when setNamespaceIsolationPolicy.", clientAppId(), ex);
return;
}
AtomicInteger tenantsNumber = new AtomicInteger(tenants.size());
// get all tenants now, for each tenants, get its namespaces
tenants.forEach(tenant -> namespaces.getNamespacesAsync(tenant)
.whenComplete((nss, e) -> {
int leftTenantsToHandle = tenantsNumber.decrementAndGet();
if (ex != null) {
log.error("[{}] Failed to get namespaces for tenant {} when setNamespaceIsolationPolicy.",
clientAppId(), tenant, ex);

if (leftTenantsToHandle == 0) {
unloadMatchedNamespacesList(asyncResponse, nssToUnload, namespaces);
}

return;
}

AtomicInteger nssNumber = new AtomicInteger(nss.size());

// get all namespaces for this tenant now.
nss.forEach(namespaceName -> {
int leftNssToHandle = nssNumber.decrementAndGet();

// if namespace match any policy regex, add it to ns list to be unload.
if (policyData.namespaces.stream()
.anyMatch(nsnameRegex -> namespaceName.matches(nsnameRegex))) {
nssToUnload.add(namespaceName);
}

// all the tenants & namespaces get filtered.
if (leftNssToHandle == 0 && leftTenantsToHandle == 0) {
unloadMatchedNamespacesList(asyncResponse, nssToUnload, namespaces);
}
});
}));
});
}

private void unloadMatchedNamespacesList(AsyncResponse asyncResponse,
List<String> nssToUnload,
Namespaces namespaces) {
if (nssToUnload.size() == 0) {
asyncResponse.resume(Response.noContent().build());
return;
}

List<CompletableFuture<Void>> futures = nssToUnload.stream()
.map(namespaceName -> namespaces.unloadAsync(namespaceName))
.collect(Collectors.toList());

FutureUtil.waitForAll(futures).whenComplete((result, exception) -> {
if (exception != null) {
log.error("[{}] Failed to unload namespace while setNamespaceIsolationPolicy.",
clientAppId(), exception);
asyncResponse.resume(new RestException(exception));
return;
}

try {
// write load info to load manager to make the load happens fast
pulsar().getLoadManager().get().writeLoadReportOnZookeeper(true);
} catch (Exception e) {
log.warn("[{}] Failed to writeLoadReportOnZookeeper.", clientAppId(), e);
}

asyncResponse.resume(Response.noContent().build());
return;
});
}

private boolean createZnodeIfNotExist(String path, Optional<Object> value)
throws KeeperException, InterruptedException {
// create persistent node on ZooKeeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,14 @@ public interface LoadManager {
*/
void writeLoadReportOnZookeeper() throws Exception;

/**
* Publish the current load report on ZK, forced or not.
* By default rely on method writeLoadReportOnZookeeper().
*/
default void writeLoadReportOnZookeeper(boolean force) throws Exception {
writeLoadReportOnZookeeper();
}

/**
* Update namespace bundle resource quota on ZK.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,13 @@ public interface ModularLoadManager {
*/
void writeBrokerDataOnZooKeeper();

/**
* As any broker, write the local broker data to ZooKeeper, forced or not.
*/
default void writeBrokerDataOnZooKeeper(boolean force) {
writeBrokerDataOnZooKeeper();
}

/**
* As the leader broker, write bundle data aggregated from all brokers to ZooKeeper.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,16 +992,21 @@ private void updateLoadBalancingMetrics(final SystemResourceUsage systemResource
*/
@Override
public void writeBrokerDataOnZooKeeper() {
writeBrokerDataOnZooKeeper(false);
}

@Override
public void writeBrokerDataOnZooKeeper(boolean force) {
try {
updateLocalBrokerData();
if (needBrokerDataUpdate()) {
if (needBrokerDataUpdate() || force) {
localData.setLastUpdate(System.currentTimeMillis());

try {
zkClient.setData(brokerZnodePath, localData.getJsonBytes(), -1);
} catch (KeeperException.NoNodeException e) {
ZkUtils.createFullPathOptimistic(zkClient, brokerZnodePath, localData.getJsonBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}

// Clear deltas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public void writeLoadReportOnZookeeper() {
loadManager.writeBrokerDataOnZooKeeper();
}

@Override
public void writeLoadReportOnZookeeper(boolean force) {
loadManager.writeBrokerDataOnZooKeeper(force);
}

@Override
public void writeResourceQuotasToZooKeeper() {
loadManager.writeBundleDataOnZooKeeper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setEnableNamespaceIsolationUpdateOnTime(true);
super.internalSetup();

// create otherbroker to test redirect on calls that need
Expand Down Expand Up @@ -908,6 +909,62 @@ public void brokerNamespaceIsolationPolicies() throws Exception {
assertFalse(isolationData.isPrimary);
}

// create 1 namespace:
// 0. without isolation policy configured, lookup will success.
// 1. with matched isolation broker configured and matched, lookup will success.
// 2. update isolation policy, without broker matched, lookup will fail.
@Test
public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception {
String brokerName = pulsar.getAdvertisedAddress();
String ns1Name = "prop-xyz/test_ns1_iso_" + System.currentTimeMillis();
admin.namespaces().createNamespace(ns1Name, Sets.newHashSet("test"));

// 0. without isolation policy configured, lookup will success.
String brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic1");
assertTrue(brokerUrl.contains(brokerName));
log.info("0 get lookup url {}", brokerUrl);

// create
String policyName1 = "policy-1";
String cluster = pulsar.getConfiguration().getClusterName();
String namespaceRegex = ns1Name;
NamespaceIsolationData nsPolicyData1 = new NamespaceIsolationData();
nsPolicyData1.namespaces = new ArrayList<String>();
nsPolicyData1.namespaces.add(ns1Name);
nsPolicyData1.primary = new ArrayList<String>();
nsPolicyData1.primary.add(brokerName + ".*");
nsPolicyData1.auto_failover_policy = new AutoFailoverPolicyData();
nsPolicyData1.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available;
nsPolicyData1.auto_failover_policy.parameters = new HashMap<String, String>();
nsPolicyData1.auto_failover_policy.parameters.put("min_limit", "1");
nsPolicyData1.auto_failover_policy.parameters.put("usage_threshold", "100");
admin.clusters().createNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get();

// 1. with matched isolation broker configured and matched, lookup will success.
brokerUrl = admin.lookups().lookupTopic(ns1Name + "/topic2");
assertTrue(brokerUrl.contains(brokerName));
log.info(" 1 get lookup url {}", brokerUrl);

// 2. update isolation policy, without broker matched, lookup will fail.
nsPolicyData1.primary = new ArrayList<String>();
nsPolicyData1.primary.add(brokerName + "not_match");
admin.clusters().updateNamespaceIsolationPolicyAsync(cluster, policyName1, nsPolicyData1).get();

try {
admin.lookups().lookupTopic(ns1Name + "/topic3");
} catch (Exception e) {
// expected lookup fail, because no brokers matched the policy.
log.info(" 2 expected fail lookup");
}

try {
admin.lookups().lookupTopic(ns1Name + "/topic1");
} catch (Exception e) {
// expected lookup fail, because no brokers matched the policy.
log.info(" 22 expected fail lookup");
}
}

@Test
public void clustersList() throws PulsarAdminException {
final String cluster = pulsar.getConfiguration().getClusterName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ public void clusters() throws Exception {
policyData.auto_failover_policy.parameters = new HashMap<String, String>();
policyData.auto_failover_policy.parameters.put("min_limit", "1");
policyData.auto_failover_policy.parameters.put("usage_threshold", "90");
clusters.setNamespaceIsolationPolicy("use", "policy1", policyData);
AsyncResponse response = mock(AsyncResponse.class);
clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData);
clusters.getNamespaceIsolationPolicies("use");

try {
Expand Down

0 comments on commit 3604c67

Please sign in to comment.