Skip to content

Commit

Permalink
Update bundle-cache on split-bundle and avoid disabling main bundle (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Oct 13, 2017
1 parent ccd9ff3 commit f551c4f
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.cache;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
Expand Down Expand Up @@ -90,6 +89,9 @@ public CompletableFuture<Optional<LocalPolicies>> getAsync(String path) {
// create new policies node under Local ZK by coping it from Global ZK
createPolicies(path, true).thenAccept(p -> {
LOG.info("Successfully created local policies for {} -- {}", path, p);
// local-policies have been created but it's not part of policiesCache. so, call
// super.getAsync() which will load it and set the watch on local-policies path
super.getAsync(path);
future.complete(p);
}).exceptionally(ex -> {
future.completeExceptionally(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,9 +571,9 @@ public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle) throws
updateNamespaceBundles(nsname, splittedBundles.getLeft(),
(rc, path, zkCtx, stat) -> pulsar.getOrderedExecutor().submit(safeRun(() -> {
if (rc == KeeperException.Code.OK.intValue()) {
// disable old bundle
try {
ownershipCache.disableOwnership(bundle);
// disable old bundle in memory
getOwnershipCache().updateBundleState(bundle, false);
// invalidate cache as zookeeper has new split
// namespace bundle
bundleFactory.invalidateBundleCache(nsname);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
return future;
});

// local-policies have been changed which has contains namespace bundles
pulsar.getLocalZkCacheService().policiesCache()
.registerListener((String path, LocalPolicies data, Stat stat) -> {
String[] paths = path.split(LOCAL_POLICIES_ROOT + "/");
if (paths.length == 2) {
invalidateBundleCache(new NamespaceName(paths[1]));
}
});

if (pulsar != null && pulsar.getConfigurationCache() != null) {
pulsar.getLocalZkCacheService().policiesCache().registerListener(this);
}
Expand Down Expand Up @@ -225,8 +234,8 @@ public static void validateFullRange(SortedSet<String> partitions) {
checkArgument(partitions.first().equals(FIRST_BOUNDARY) && partitions.last().equals(LAST_BOUNDARY));
}

public static NamespaceBundleFactory createFactory(HashFunction hashFunc) {
return new NamespaceBundleFactory(null, hashFunc);
public static NamespaceBundleFactory createFactory(PulsarService pulsar, HashFunction hashFunc) {
return new NamespaceBundleFactory(pulsar, hashFunc);
}

public static boolean isFullBundle(String bundleRange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.cache;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;

import java.util.concurrent.Executors;
Expand All @@ -32,9 +35,11 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.MockZooKeeper;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -58,6 +63,13 @@ public void setup() throws Exception {
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor);
localCache = new LocalZooKeeperCacheService(zkCache, null);

// set mock pulsar localzkcache
LocalZooKeeperCacheService localZkCache = mock(LocalZooKeeperCacheService.class);
ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localZkCache);
when(localZkCache.policiesCache()).thenReturn(poilciesCache);
doNothing().when(poilciesCache).registerListener(any());
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());

doReturn(zkCache).when(pulsar).getLocalZkCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.LocalBrokerData;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
Expand All @@ -59,10 +61,12 @@
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -124,7 +128,7 @@ public void testSplitAndOwnBundles() throws Exception {
List<NamespaceBundle> bundleList = updatedNsBundles.getBundles();
assertNotNull(bundles);

NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(Hashing.crc32());
NamespaceBundleFactory utilityFactory = NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());

// (1) validate bundleFactory-cache has newly split bundles and removed old parent bundle
Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles = splitBundles(utilityFactory, nsname, bundles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.PulsarService.webAddress;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
Expand All @@ -31,7 +35,6 @@

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

Expand All @@ -48,8 +51,10 @@
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.MockZooKeeper;
Expand Down Expand Up @@ -81,7 +86,12 @@ public void setup() throws Exception {
executor = new OrderedSafeExecutor(1, "test");
scheduledExecutor = Executors.newScheduledThreadPool(2);
zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor, scheduledExecutor);
localCache = new LocalZooKeeperCacheService(zkCache, null);
localCache = spy(new LocalZooKeeperCacheService(zkCache, null));
ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localCache);
when(localCache.policiesCache()).thenReturn(poilciesCache);
doNothing().when(poilciesCache).registerListener(any());

bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
nsService = mock(NamespaceService.class);
brokerService = mock(BrokerService.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.naming.AuthenticationException;
import javax.net.ssl.HttpsURLConnection;
Expand Down Expand Up @@ -68,6 +69,7 @@
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
Expand Down Expand Up @@ -738,6 +740,105 @@ public void start() throws PulsarClientException {
}
}

/**
*
* <pre>
* When broker-1's load-manager splits the bundle and update local-policies, broker-2 should get watch of
* local-policies and update bundleCache so, new lookup can be redirected properly.
*
* (1) Start broker-1 and broker-2
* (2) Make sure broker-2 always assign bundle to broker1
* (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch
* (4) Broker-1 will own topic-1
* (5) Split the bundle for topic-1
* (6) Broker-2 should get the watch and update bundle cache
* (7) Make lookup request again to Broker-2 which should succeed.
*
* </pre>
*
* @throws Exception
*/
@Test(timeOut = 5000)
public void testSplitUnloadLookupTest() throws Exception {

log.info("-- Starting {} test --", methodName);

final String namespace = "my-property/use/my-ns";
// (1) Start broker-1
ServiceConfiguration conf2 = new ServiceConfiguration();
conf2.setBrokerServicePort(PortManager.nextFreePort());
conf2.setBrokerServicePortTls(PortManager.nextFreePort());
conf2.setWebServicePort(PortManager.nextFreePort());
conf2.setWebServicePortTls(PortManager.nextFreePort());
conf2.setAdvertisedAddress("localhost");
conf2.setClusterName(conf.getClusterName());
PulsarService pulsar2 = startBroker(conf2);
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();

pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
pulsar2.getLoadManager().get().writeLoadReportOnZookeeper();

LoadManager loadManager1 = spy(pulsar.getLoadManager().get());
LoadManager loadManager2 = spy(pulsar2.getLoadManager().get());
Field loadManagerField = NamespaceService.class.getDeclaredField("loadManager");
loadManagerField.setAccessible(true);

// (2) Make sure broker-2 always assign bundle to broker1
// mock: redirect request to leader [2]
doReturn(true).when(loadManager2).isCentralized();
loadManagerField.set(pulsar2.getNamespaceService(), new AtomicReference<>(loadManager2));
// mock: return Broker1 as a Least-loaded broker when leader receies request [3]
doReturn(true).when(loadManager1).isCentralized();
SimpleResourceUnit resourceUnit = new SimpleResourceUnit(pulsar.getWebServiceAddress(), null);
doReturn(resourceUnit).when(loadManager1).getLeastLoaded(any(ServiceUnitId.class));
loadManagerField.set(pulsar.getNamespaceService(), new AtomicReference<>(loadManager1));

URI broker2ServiceUrl = new URI("pulsar://localhost:" + conf2.getBrokerServicePort());
PulsarClient pulsarClient2 = PulsarClient.create(broker2ServiceUrl.toString(), new ClientConfiguration());

// (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch
final String topic1 = "persistent://" + namespace + "/topic1";
Consumer consumer1 = pulsarClient2.subscribe(topic1, "my-subscriber-name", new ConsumerConfiguration());

Set<String> serviceUnits1 = pulsar.getNamespaceService().getOwnedServiceUnits().stream()
.map(nb -> nb.toString()).collect(Collectors.toSet());

// (4) Broker-1 will own topic-1
final String unsplitBundle = namespace + "/0x00000000_0xffffffff";
Assert.assertTrue(serviceUnits1.contains(unsplitBundle));
// broker-2 should have this bundle into the cache
DestinationName destination = DestinationName.get(topic1);
NamespaceBundle bundleInBroker2 = pulsar2.getNamespaceService().getBundle(destination);
Assert.assertEquals(bundleInBroker2.toString(), unsplitBundle);

// (5) Split the bundle for topic-1
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff");

// (6) Broker-2 should get the watch and update bundle cache
final int retry = 5;
for (int i = 0; i < retry; i++) {
if (pulsar2.getNamespaceService().getBundle(destination).equals(bundleInBroker2) && i != retry - 1) {
Thread.sleep(200);
} else {
break;
}
}

// (7) Make lookup request again to Broker-2 which should succeed.
final String topic2 = "persistent://" + namespace + "/topic2";
Consumer consumer2 = pulsarClient2.subscribe(topic2, "my-subscriber-name", new ConsumerConfiguration());

NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
.getBundle(DestinationName.get(topic2));
Assert.assertFalse(bundleInBroker1AfterSplit.equals(unsplitBundle));

consumer1.close();
consumer2.close();
pulsarClient2.close();
pulsar2.close();

}
/**** helper classes ****/

public static class MockAuthenticationProvider implements AuthenticationProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,32 @@
*/
package org.apache.pulsar.common.naming;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.testng.annotations.Test;

import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.hash.Hashing;

public class NamespaceBundleTest {
private final NamespaceBundleFactory factory = NamespaceBundleFactory.createFactory(Hashing.crc32());
private final NamespaceBundleFactory factory = getNamespaceBundleFactory();

@Test
public void testConstructor() {
Expand Down Expand Up @@ -110,6 +118,16 @@ public void testConstructor() {
assertEquals(bundle.getNamespaceObject().toString(), "pulsar/use/ns");
}

private NamespaceBundleFactory getNamespaceBundleFactory() {
PulsarService pulsar = mock(PulsarService.class);
LocalZooKeeperCacheService localZkCache = mock(LocalZooKeeperCacheService.class);
ZooKeeperDataCache<LocalPolicies> poilciesCache = mock(ZooKeeperDataCache.class);
when(pulsar.getLocalZkCacheService()).thenReturn(localZkCache);
when(localZkCache.policiesCache()).thenReturn(poilciesCache);
doNothing().when(poilciesCache).registerListener(any());
return NamespaceBundleFactory.createFactory(pulsar, Hashing.crc32());
}

@Test
public void testGetBundle() throws Exception {
NamespaceBundle bundle = factory.getBundle(new NamespaceName("pulsar/use/ns1"),
Expand Down
Loading

0 comments on commit f551c4f

Please sign in to comment.