Skip to content

Commit

Permalink
Namespace-isolation policy should consider secondary brokers (apache#877
Browse files Browse the repository at this point in the history
)

* Namespace-isolation policy should consider secondary brokers

* fix: secondaryCache renaming

* fix test: set broker-host name in isolation policies and not broker-url
  • Loading branch information
rdhabalia authored Nov 3, 2017
1 parent 9f63fa7 commit a0c98eb
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class LoadManagerShared {
private static final Set<String> primariesCache = new HashSet<>();

// Cache for shard brokers according to policies.
private static final Set<String> sharedCache = new HashSet<>();
private static final Set<String> secondaryCache = new HashSet<>();

// update LoadReport at most every 5 seconds
public static final long LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL = TimeUnit.SECONDS.toMillis(5);
Expand All @@ -68,7 +68,7 @@ public static synchronized void applyPolicies(final ServiceUnitId serviceUnit,
final Set<String> availableBrokers,
final BrokerTopicLoadingPredicate brokerTopicLoadingPredicate) {
primariesCache.clear();
sharedCache.clear();
secondaryCache.clear();
NamespaceName namespace = serviceUnit.getNamespaceObject();
boolean isIsolationPoliciesPresent = policies.IsIsolationPoliciesPresent(namespace);
boolean isNonPersistentTopic = (serviceUnit instanceof NamespaceBundle)
Expand Down Expand Up @@ -96,8 +96,8 @@ public static synchronized void applyPolicies(final ServiceUnitId serviceUnit,
log.debug("Added Primary Broker - [{}] as possible Candidates for"
+ " namespace - [{}] with policies", brokerUrl.getHost(), namespace.toString());
}
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
sharedCache.add(broker);
} else if (policies.isSecondaryBroker(namespace, brokerUrl.getHost())) {
secondaryCache.add(broker);
if (log.isDebugEnabled()) {
log.debug(
"Added Shared Broker - [{}] as possible "
Expand Down Expand Up @@ -127,7 +127,7 @@ public static synchronized void applyPolicies(final ServiceUnitId serviceUnit,
brokerUrl.getHost(), namespace.toString());
}
} else if (policies.isSharedBroker(brokerUrl.getHost())) {
sharedCache.add(broker);
secondaryCache.add(broker);
if (log.isDebugEnabled()) {
log.debug("Added Shared Broker - [{}] as possible Candidates for namespace - [{}]",
brokerUrl.getHost(), namespace.toString());
Expand All @@ -141,15 +141,15 @@ public static synchronized void applyPolicies(final ServiceUnitId serviceUnit,
log.debug(
"Not enough of primaries [{}] available for namespace - [{}], "
+ "adding shared [{}] as possible candidate owners",
primariesCache.size(), namespace.toString(), sharedCache.size());
brokerCandidateCache.addAll(sharedCache);
primariesCache.size(), namespace.toString(), secondaryCache.size());
brokerCandidateCache.addAll(secondaryCache);
}
} else {
log.debug(
"Policies not present for namespace - [{}] so only "
+ "considering shared [{}] brokers for possible owner",
namespace.toString(), sharedCache.size());
brokerCandidateCache.addAll(sharedCache);
namespace.toString(), secondaryCache.size());
brokerCandidateCache.addAll(secondaryCache);
}
}

Expand Down Expand Up @@ -273,7 +273,7 @@ public static void removeMostServicingBrokersForNamespace(final String assignedB
}
}

interface BrokerTopicLoadingPredicate {
public interface BrokerTopicLoadingPredicate {
boolean isEnablePersistentTopics(String brokerUrl);

boolean isEnableNonPersistentTopics(String brokerUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class SimpleResourceAllocationPolicies {
private final ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPolicies;
private final PulsarService pulsar;

SimpleResourceAllocationPolicies(PulsarService pulsar) {
public SimpleResourceAllocationPolicies(PulsarService pulsar) {
this.namespaceIsolationPolicies = pulsar.getConfigurationCache().namespaceIsolationPoliciesCache();
this.pulsar = pulsar;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -127,6 +128,7 @@ void setup() throws Exception {
"{\"loadBalancerStrategy\":\"leastLoadedServer\"}".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);

final String localhost = InetAddress.getLocalHost().getHostName();
// start brokers
for (int i = 0; i < BROKER_COUNT; i++) {
brokerWebServicePorts[i] = PortManager.nextFreePort();
Expand All @@ -139,6 +141,7 @@ void setup() throws Exception {
config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
config.setBrokerServicePort(brokerNativeBrokerPorts[i]);
config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
config.setAdvertisedAddress(localhost+i);;

pulsarServices[i] = new PulsarService(config);
pulsarServices[i].start();
Expand Down Expand Up @@ -739,7 +742,7 @@ private void createNamespacePolicies(PulsarService pulsar) throws Exception {
policyData.namespaces.add("pulsar/use/primary-ns.*");
policyData.primary = new ArrayList<String>();
for (int i = 0; i < BROKER_COUNT; i++) {
policyData.primary.add(lookupAddresses[i]);
policyData.primary.add(pulsarServices[i].getAdvertisedAddress());
}
policyData.secondary = new ArrayList<String>();
policyData.auto_failover_policy = new AutoFailoverPolicyData();
Expand All @@ -754,10 +757,10 @@ private void createNamespacePolicies(PulsarService pulsar) throws Exception {
policyData.namespaces = new ArrayList<String>();
policyData.namespaces.add("pulsar/use/secondary-ns.*");
policyData.primary = new ArrayList<String>();
policyData.primary.add(lookupAddresses[0]);
policyData.primary.add(pulsarServices[0].getAdvertisedAddress());
policyData.secondary = new ArrayList<String>();
for (int i = 1; i < BROKER_COUNT; i++) {
policyData.secondary.add(lookupAddresses[i]);
policyData.secondary.add(pulsarServices[i].getAdvertisedAddress());
}
policyData.auto_failover_policy = new AutoFailoverPolicyData();
policyData.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available;
Expand All @@ -771,10 +774,10 @@ private void createNamespacePolicies(PulsarService pulsar) throws Exception {
policyData.namespaces = new ArrayList<String>();
policyData.namespaces.add("pulsar/use/shared-ns.*");
policyData.primary = new ArrayList<String>();
policyData.primary.add(lookupAddresses[0]);
policyData.primary.add(pulsarServices[0].getAdvertisedAddress());
policyData.secondary = new ArrayList<String>();
for (int i = 1; i < BROKER_COUNT; i++) {
policyData.secondary.add(lookupAddresses[i]);
policyData.secondary.add(pulsarServices[i].getAdvertisedAddress());
}
policyData.auto_failover_policy = new AutoFailoverPolicyData();
policyData.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.loadbalance.impl.SimpleResourceAllocationPolicies;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
Expand All @@ -58,6 +61,11 @@
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
Expand All @@ -72,8 +80,11 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;

public class ModularLoadManagerImplTest {
Expand Down Expand Up @@ -425,4 +436,120 @@ public void testBrokerStopCacheUpdate() throws Exception {
Set<String> avaialbeBrokers = loadManager.getAvailableBrokers();
assertEquals(avaialbeBrokers.size(), 1);
}

/**
* It verifies namespace-isolation policies with primary and secondary brokers.
*
* usecase:
*
* <pre>
* 1. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 1
* a. available-brokers: broker1, broker2, broker3 => result: broker1
* b. available-brokers: broker2, broker3 => result: broker2
* c. available-brokers: broker3 => result: NULL
* 2. Namespace: primary=broker1, secondary=broker2, shared=broker3, min_limit = 2
* a. available-brokers: broker1, broker2, broker3 => result: broker1, broker2
* b. available-brokers: broker2, broker3 => result: broker2
* c. available-brokers: broker3 => result: NULL
* </pre>
*
* @throws Exception
*/
@Test
public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws Exception {

final String property = "my-property";
final String cluster = "use";
final String namespace = "my-ns";
final String broker1Address = pulsar1.getAdvertisedAddress() + "0";
final String broker2Address = pulsar2.getAdvertisedAddress() + "1";
final String sharedBroker = "broker3";
admin1.clusters().createCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress()));
admin1.properties().createProperty(property,
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(property + "/" + cluster + "/" + namespace);

// set a new policy
String newPolicyJsonTemplate = "{\"namespaces\":[\"%s/%s/%s.*\"],\"primary\":[\"%s\"],"
+ "\"secondary\":[\"%s\"],\"auto_failover_policy\":{\"policy_type\":\"min_available\",\"parameters\":{\"min_limit\":%s,\"usage_threshold\":80}}}";
String newPolicyJson = String.format(newPolicyJsonTemplate, property, cluster, namespace, broker1Address,
broker2Address, 1);
String newPolicyName = "my-ns-isolation-policies";
ObjectMapper jsonMapper = ObjectMapperFactory.create();
NamespaceIsolationData nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(),
NamespaceIsolationData.class);
admin1.clusters().createNamespaceIsolationPolicy("use", newPolicyName, nsPolicyData);

SimpleResourceAllocationPolicies simpleResourceAllocationPolicies = new SimpleResourceAllocationPolicies(
pulsar1);
ServiceUnitId serviceUnit = LoadBalancerTestingUtils.makeBundles(nsFactory, property, cluster, namespace, 1)[0];
BrokerTopicLoadingPredicate brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
return true;
}

@Override
public boolean isEnableNonPersistentTopics(String brokerUrl) {
return true;
}
};

// (1) now we have isolation policy : primary=broker1, secondary=broker2, minLimit=1

// test1: shared=1, primary=1, secondary=1 => It should return 1 primary broker only
Set<String> brokerCandidateCache = Sets.newHashSet();
Set<String> availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 1);
assertTrue(brokerCandidateCache.contains(broker1Address));

// test2: shared=1, primary=0, secondary=1 => It should return 1 secondary broker only
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 1);
assertTrue(brokerCandidateCache.contains(broker2Address));

// test3: shared=1, primary=0, secondary=0 => It should return 0 broker
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker);
LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 0);

// (2) now we will have isolation policy : primary=broker1, secondary=broker2, minLimit=2

newPolicyJson = String.format(newPolicyJsonTemplate, property, cluster, namespace, broker1Address,
broker2Address, 2);
nsPolicyData = jsonMapper.readValue(newPolicyJson.getBytes(), NamespaceIsolationData.class);
admin1.clusters().createNamespaceIsolationPolicy("use", newPolicyName, nsPolicyData);

// test1: shared=1, primary=1, secondary=1 => It should return primary + secondary
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker, broker1Address, broker2Address);
LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 2);
assertTrue(brokerCandidateCache.contains(broker1Address));
assertTrue(brokerCandidateCache.contains(broker2Address));

// test2: shared=1, secondary=1 => It should return secondary
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker, broker2Address);
LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 1);
assertTrue(brokerCandidateCache.contains(broker2Address));

// test3: shared=1, => It should return 0 broker
brokerCandidateCache = Sets.newHashSet();
availableBrokers = Sets.newHashSet(sharedBroker);
LoadManagerShared.applyPolicies(serviceUnit, simpleResourceAllocationPolicies, brokerCandidateCache,
availableBrokers, brokerTopicLoadingPredicate);
assertEquals(brokerCandidateCache.size(), 0);

}
}

0 comments on commit a0c98eb

Please sign in to comment.