Skip to content

Commit

Permalink
Delete local-policies and invalidate cache when namespace is deleted (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Apr 13, 2017
1 parent bd5a622 commit 19399e0
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import com.google.common.collect.Lists;
import com.yahoo.pulsar.broker.PulsarServerException;
import static com.yahoo.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import com.yahoo.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -318,8 +319,12 @@ public void deleteNamespace(@PathParam("property") String property, @PathParam("
}

// we have successfully removed all the ownership for the namespace, the policies znode can be deleted now
globalZk().delete(path("policies", property, cluster, namespace), -1);
policiesCache().invalidate(path("policies", property, cluster, namespace));
final String globalZkPolicyPath = path("policies", property, cluster, namespace);
final String lcaolZkPolicyPath = joinPath(LOCAL_POLICIES_ROOT, property, cluster, namespace);
globalZk().delete(globalZkPolicyPath, -1);
localZk().delete(lcaolZkPolicyPath, -1);
policiesCache().invalidate(globalZkPolicyPath);
localCacheService().policiesCache().invalidate(lcaolZkPolicyPath);
} catch (PulsarAdminException cae) {
throw new RestException(cae);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -668,14 +668,17 @@ private boolean isDestinationOwned(DestinationName fqdn) throws Exception {

public void removeOwnedServiceUnit(NamespaceName nsName) throws Exception {
ownershipCache.removeOwnership(getFullBundle(nsName)).get();
bundleFactory.invalidateBundleCache(nsName);
}

public void removeOwnedServiceUnit(NamespaceBundle nsBundle) throws Exception {
ownershipCache.removeOwnership(nsBundle).get();
bundleFactory.invalidateBundleCache(nsBundle.getNamespaceObject());
}

public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData) throws Exception {
ownershipCache.removeOwnership(bundleFactory.getBundles(nsName, bundleData)).get();
bundleFactory.invalidateBundleCache(nsName);
}

public NamespaceBundleFactory getNamespaceBundleFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
*/
package com.yahoo.pulsar.broker.admin;

import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.ArrayList;
import java.util.EnumSet;
Expand All @@ -39,9 +37,6 @@
import javax.ws.rs.client.WebTarget;

import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -62,7 +57,6 @@
import com.yahoo.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import com.yahoo.pulsar.broker.namespace.NamespaceEphemeralData;
import com.yahoo.pulsar.broker.namespace.NamespaceService;
import com.yahoo.pulsar.broker.service.BrokerService;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
import com.yahoo.pulsar.client.admin.PulsarAdminException.ConflictException;
Expand Down Expand Up @@ -1703,4 +1697,34 @@ public void failed(Throwable e) {
assertEquals(uriStats.get().subscriptions.size(), 1);
}

/**
* Verifies that deleteNamespace cleans up policies(global,local), bundle cache and bundle ownership
*
* @throws Exception
*/
@Test
public void testDeleteNamespace() throws Exception {

final String namespace = "prop-xyz/use/deleteNs";
admin.namespaces().createNamespace(namespace, 100);
assertEquals(admin.namespaces().getPolicies(namespace).bundles.numBundles, 100);

// (1) Force topic creation and namespace being loaded
final String topicName = "persistent://" + namespace + "/my-topic";
DestinationName destination = DestinationName.get(topicName);

Producer producer = pulsarClient.createProducer(topicName);
producer.close();
NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(destination);
// (2) Delete topic
admin.persistentTopics().delete(topicName);
// (3) Delete ns
admin.namespaces().deleteNamespace(namespace);
// (4) check bundle
NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(destination);
assertNotEquals(bundle1.getBundleRange(), bundle2.getBundleRange());
// returns full bundle if policies not present
assertEquals("0x00000000_0xffffffff", bundle2.getBundleRange());

}
}

0 comments on commit 19399e0

Please sign in to comment.