Skip to content

Commit

Permalink
Fix mis-use of local/global zk and provide different zk for global/lo…
Browse files Browse the repository at this point in the history
…cal in MockedPulsarServiceBaseTest (apache#9222)

Fixes apache#9207 

### Motivation
This PR is a continue for apache#9193 , since apache#9193  is only a partial fix for mis-use of local/global zk. 
This PR fix others places of mis-use and provide different zk for global/local in `MockedPulsarServiceBaseTest`
  • Loading branch information
aloyszhang authored Jan 20, 2021
1 parent 80ab00c commit 29f0e1b
Show file tree
Hide file tree
Showing 32 changed files with 111 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ protected void zkCreate(String path, byte[] content) throws Exception {
globalZk().create(path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

protected void localZKCreate(String path, byte[] content) throws Exception {
localZk().create(path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}

protected void zkCreateOptimistic(String path, byte[] content) throws Exception {
ZkUtils.createFullPathOptimistic(globalZk(), path, content, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void updateBookieRackInfo(@PathParam("bookie") String bookieAddress, @Que
// Creates the z-node with racks info
BookiesRackConfiguration racks = new BookiesRackConfiguration();
racks.updateBookie(group, bookieAddress, bookieInfo);
zkCreate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper().writeValueAsBytes(racks));
localZKCreate(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper().writeValueAsBytes(racks));
log.info("Created rack mapping info and added {}", bookieAddress);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1147,7 +1147,7 @@ public CompletableFuture<List<String>> getPartitions(NamespaceName namespaceName
LOG.debug("Getting children from partitioned-topics now: {}", path);
}

return pulsar.getLocalZkCache().getChildrenAsync(path, null).thenCompose(topics -> {
return pulsar.getGlobalZkCache().getChildrenAsync(path, null).thenCompose(topics -> {
CompletableFuture<List<String>> result = new CompletableFuture<>();
List<String> resultPartitions = Collections.synchronizedList(Lists.newArrayList());
if (CollectionUtils.isNotEmpty(topics)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2547,7 +2547,7 @@ private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int numParti
if (maxTopicsPerNamespace > 0) {
String partitionedTopicPath = PulsarWebResource.joinPath(MANAGED_LEDGER_PATH_ZNODE,
topicName.getNamespace(), topicName.getDomain().value());
List<String> topics = pulsar().getGlobalZkCache().getZooKeeper()
List<String> topics = pulsar().getLocalZkCache().getZooKeeper()
.getChildren(partitionedTopicPath, false);
if (topics.size() + numPartitions > maxTopicsPerNamespace) {
log.error("Failed to create persistent topic {}, "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void setup() throws Exception {

clusters = spy(new Clusters());
clusters.setPulsar(pulsar);
doReturn(mockZooKeeper).when(clusters).globalZk();
doReturn(mockZooKeeperGlobal).when(clusters).globalZk();
doReturn(configurationCache.clustersCache()).when(clusters).clustersCache();
doReturn(configurationCache.clustersListCache()).when(clusters).clustersListCache();
doReturn(configurationCache.namespaceIsolationPoliciesCache()).when(clusters).namespaceIsolationPoliciesCache();
Expand All @@ -139,15 +139,15 @@ public void setup() throws Exception {
properties = spy(new Properties());
properties.setServletContext(new MockServletContext());
properties.setPulsar(pulsar);
doReturn(mockZooKeeper).when(properties).globalZk();
doReturn(mockZooKeeperGlobal).when(properties).globalZk();
doReturn(configurationCache.propertiesCache()).when(properties).tenantsCache();
doReturn("test").when(properties).clientAppId();
doNothing().when(properties).validateSuperUserAccess();

namespaces = spy(new Namespaces());
namespaces.setServletContext(new MockServletContext());
namespaces.setPulsar(pulsar);
doReturn(mockZooKeeper).when(namespaces).globalZk();
doReturn(mockZooKeeperGlobal).when(namespaces).globalZk();
doReturn(mockZooKeeper).when(namespaces).localZk();
doReturn(configurationCache.propertiesCache()).when(namespaces).tenantsCache();
doReturn(configurationCache.policiesCache()).when(namespaces).policiesCache();
Expand All @@ -160,7 +160,7 @@ public void setup() throws Exception {
brokers = spy(new Brokers());
brokers.setServletContext(new MockServletContext());
brokers.setPulsar(pulsar);
doReturn(mockZooKeeper).when(brokers).globalZk();
doReturn(mockZooKeeperGlobal).when(brokers).globalZk();
doReturn(mockZooKeeper).when(brokers).localZk();
doReturn(configurationCache.clustersListCache()).when(brokers).clustersListCache();
doReturn("test").when(brokers).clientAppId();
Expand All @@ -172,7 +172,7 @@ public void setup() throws Exception {
persistentTopics = spy(new PersistentTopics());
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
doReturn(mockZooKeeper).when(persistentTopics).globalZk();
doReturn(mockZooKeeperGlobal).when(persistentTopics).globalZk();
doReturn(mockZooKeeper).when(persistentTopics).localZk();
doReturn(configurationCache.propertiesCache()).when(persistentTopics).tenantsCache();
doReturn(configurationCache.policiesCache()).when(persistentTopics).policiesCache();
Expand All @@ -186,15 +186,15 @@ public void setup() throws Exception {
resourceQuotas = spy(new ResourceQuotas());
resourceQuotas.setServletContext(new MockServletContext());
resourceQuotas.setPulsar(pulsar);
doReturn(mockZooKeeper).when(resourceQuotas).globalZk();
doReturn(mockZooKeeperGlobal).when(resourceQuotas).globalZk();
doReturn(mockZooKeeper).when(resourceQuotas).localZk();
doReturn(configurationCache.propertiesCache()).when(resourceQuotas).tenantsCache();
doReturn(configurationCache.policiesCache()).when(resourceQuotas).policiesCache();

brokerStats = spy(new BrokerStats());
brokerStats.setServletContext(new MockServletContext());
brokerStats.setPulsar(pulsar);
doReturn(mockZooKeeper).when(brokerStats).globalZk();
doReturn(mockZooKeeperGlobal).when(brokerStats).globalZk();
doReturn(mockZooKeeper).when(brokerStats).localZk();
doReturn(configurationCache.propertiesCache()).when(brokerStats).tenantsCache();
doReturn(configurationCache.policiesCache()).when(brokerStats).policiesCache();
Expand All @@ -207,7 +207,7 @@ public void setup() throws Exception {
schemasResource = spy(new SchemasResource(mockClock));
schemasResource.setServletContext(new MockServletContext());
schemasResource.setPulsar(pulsar);
doReturn(mockZooKeeper).when(schemasResource).globalZk();
doReturn(mockZooKeeperGlobal).when(schemasResource).globalZk();
doReturn(mockZooKeeper).when(schemasResource).localZk();
doReturn(configurationCache.propertiesCache()).when(schemasResource).tenantsCache();
doReturn(configurationCache.policiesCache()).when(schemasResource).policiesCache();
Expand Down Expand Up @@ -324,7 +324,7 @@ public void clusters() throws Exception {
}

// Test zk failures
mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/clusters");
});
Expand All @@ -336,7 +336,7 @@ public void clusters() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE
&& path.equals("/admin/clusters/test");
});
Expand All @@ -347,7 +347,7 @@ public void clusters() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/test");
});
Expand All @@ -358,7 +358,7 @@ public void clusters() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/test");
});
Expand All @@ -370,7 +370,7 @@ public void clusters() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/policies");
});
Expand All @@ -382,7 +382,7 @@ public void clusters() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/use/namespaceIsolationPolicies");
});
Expand Down Expand Up @@ -465,7 +465,7 @@ public void properties() throws Exception {
}

// Test zk failures
mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/policies");
});
Expand All @@ -476,7 +476,7 @@ public void properties() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/policies/my-tenant");
});
Expand All @@ -487,7 +487,7 @@ public void properties() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/policies/my-tenant");
});
Expand All @@ -498,7 +498,7 @@ public void properties() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE
&& path.equals("/admin/policies/test");
});
Expand All @@ -509,7 +509,7 @@ public void properties() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/policies/my-tenant");
});
Expand All @@ -522,7 +522,7 @@ public void properties() throws Exception {

properties.createTenant("error-property", tenantInfo);

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.DELETE
&& path.equals("/admin/policies/error-property");
});
Expand Down Expand Up @@ -654,7 +654,7 @@ public void resourceQuotas() throws Exception {
// create policies
TenantInfo admin = new TenantInfo();
admin.getAllowedClusters().add(cluster);
mockZooKeeper.create(PulsarWebResource.path(POLICIES, property),
mockZooKeeperGlobal.create(PulsarWebResource.path(POLICIES, property),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(admin), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);

Expand Down Expand Up @@ -713,7 +713,7 @@ public void persistentTopics() throws Exception {
// create policies
TenantInfo admin = new TenantInfo();
admin.getAllowedClusters().add(cluster);
ZkUtils.createFullPathOptimistic(mockZooKeeper, PulsarWebResource.path(POLICIES, property, cluster, namespace),
ZkUtils.createFullPathOptimistic(mockZooKeeperGlobal, PulsarWebResource.path(POLICIES, property, cluster, namespace),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new Policies()), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public void testPersistentList() throws Exception {
conf.setAdvertisedAddress("localhost");
conf.setClusterName(this.conf.getClusterName());
conf.setZookeeperServers("localhost:2181");
conf.setConfigurationStoreServers("localhost:3181");
buildConf(conf);

@Cleanup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void setup() throws Exception {
namespaces = spy(new Namespaces());
namespaces.setServletContext(new MockServletContext());
namespaces.setPulsar(pulsar);
doReturn(mockZooKeeper).when(namespaces).globalZk();
doReturn(mockZooKeeperGlobal).when(namespaces).globalZk();
doReturn(mockZooKeeper).when(namespaces).localZk();
doReturn(pulsar.getConfigurationCache().propertiesCache()).when(namespaces).tenantsCache();
doReturn(pulsar.getConfigurationCache().policiesCache()).when(namespaces).policiesCache();
Expand Down Expand Up @@ -228,7 +228,7 @@ public void testCreateNamespaces() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE
&& path.equals("/admin/policies/my-tenant/use/my-namespace-3");
});
Expand Down Expand Up @@ -275,7 +275,7 @@ public void testGetNamespaces() throws Exception {
}

// ZK Errors
mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/policies/my-tenant");
});
Expand All @@ -286,7 +286,7 @@ public void testGetNamespaces() throws Exception {
// Ok
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/policies/my-tenant/use");
});
Expand Down Expand Up @@ -512,7 +512,7 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception {

// Sometimes watcher event consumes scheduled exception, so set to always fail to ensure exception is
// thrown for api call.
mockZooKeeper.setAlwaysFail(Code.SESSIONEXPIRED);
mockZooKeeperGlobal.setAlwaysFail(Code.SESSIONEXPIRED);
pulsar.getConfigurationCache().policiesCache().invalidate(AdminResource.path(POLICIES, this.testTenant,
"global", this.testGlobalNamespaces.get(0).getLocalName()));
try {
Expand All @@ -522,10 +522,10 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception {
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
} finally {
mockZooKeeper.unsetAlwaysFail();
mockZooKeeperGlobal.unsetAlwaysFail();
}

mockZooKeeper.failConditional(Code.BADVERSION, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.BADVERSION, (op, path) -> {
return op == MockZooKeeper.Op.SET
&& path.equals("/admin/policies/my-tenant/global/test-global-ns1");
});
Expand Down Expand Up @@ -553,7 +553,7 @@ public void testGlobalNamespaceReplicationConfiguration() throws Exception {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}

mockZooKeeper.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/policies/my-tenant/global/test-global-ns1");
});
Expand Down Expand Up @@ -694,7 +694,7 @@ public void testDeleteNamespaces() throws Exception {
// delete the topic from ZK
mockZooKeeper.delete("/managed-ledgers/" + topicName.getPersistenceNamingEncoding(), -1);

ZkUtils.createFullPathOptimistic(mockZooKeeper,
ZkUtils.createFullPathOptimistic(mockZooKeeperGlobal,
"/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(),
new byte[0], null, null);

Expand All @@ -705,7 +705,7 @@ public void testDeleteNamespaces() throws Exception {
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
assertEquals(errorCaptor.getValue().getResponse().getStatus(), Status.CONFLICT.getStatusCode());

mockZooKeeper.delete("/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(), -1);
mockZooKeeperGlobal.delete("/admin/partitioned-topics/" + topicName.getPersistenceNamingEncoding(), -1);

testNs = this.testGlobalNamespaces.get(0);
// setup ownership to localhost
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected void setup() throws Exception {
persistentTopics = spy(new PersistentTopics());
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
doReturn(mockZooKeeper).when(persistentTopics).globalZk();
doReturn(mockZooKeeperGlobal).when(persistentTopics).globalZk();
doReturn(mockZooKeeper).when(persistentTopics).localZk();
doReturn(pulsar.getConfigurationCache().propertiesCache()).when(persistentTopics).tenantsCache();
doReturn(pulsar.getConfigurationCache().policiesCache()).when(persistentTopics).policiesCache();
Expand All @@ -116,7 +116,7 @@ protected void setup() throws Exception {
nonPersistentTopic = spy(new NonPersistentTopics());
nonPersistentTopic.setServletContext(new MockServletContext());
nonPersistentTopic.setPulsar(pulsar);
doReturn(mockZooKeeper).when(nonPersistentTopic).globalZk();
doReturn(mockZooKeeperGlobal).when(nonPersistentTopic).globalZk();
doReturn(mockZooKeeper).when(nonPersistentTopic).localZk();
doReturn(pulsar.getConfigurationCache().propertiesCache()).when(nonPersistentTopic).tenantsCache();
doReturn(pulsar.getConfigurationCache().policiesCache()).when(nonPersistentTopic).policiesCache();
Expand Down
Loading

0 comments on commit 29f0e1b

Please sign in to comment.