Skip to content

Commit

Permalink
Avoid creating failure-domain for cluster which doesn't exist (apache…
Browse files Browse the repository at this point in the history
…#1474)

* Avoid creating failure-domain for cluster which doesn't exist

* Fix admin api test
  • Loading branch information
rdhabalia authored and merlimat committed Mar 30, 2018
1 parent ce61498 commit 7fa7202
Show file tree
Hide file tree
Showing 16 changed files with 32 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.cache;

import java.nio.file.Paths;
import java.util.Map;

import org.apache.bookkeeper.util.ZkUtils;
Expand Down Expand Up @@ -124,7 +125,8 @@ public FailureDomain deserialize(String path, byte[] content) throws Exception {

private void createFailureDomainRoot(ZooKeeper zk, String path) {
try {
if (zk.exists(path, false) == null) {
final String clusterZnodePath = Paths.get(path).getParent().toString();
if (zk.exists(clusterZnodePath, false) != null && zk.exists(path, false) == null) {
try {
byte[] data = "".getBytes();
ZkUtils.createFullPathOptimistic(zk, path, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private void createProperty(PulsarAdmin pulsarAdmin)
throws PulsarClientException, MalformedURLException, PulsarAdminException {
ClusterData clusterData = new ClusterData();
clusterData.setServiceUrl(pulsarAdmin.getServiceUrl().toString());
pulsarAdmins[0].clusters().updateCluster("my-cluster", clusterData);
pulsarAdmins[0].clusters().createCluster("my-cluster", clusterData);
Set<String> allowedClusters = new HashSet<>();
allowedClusters.add("my-cluster");
PropertyAdmin adminConfig = new PropertyAdmin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,33 +193,33 @@ public void clusters() throws Exception {
new ClusterData("http://broker.messaging.use.example.com" + ":" + BROKER_WEBSERVICE_PORT));
// "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
// failure-domain znode of this default cluster
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));

assertEquals(admin.clusters().getCluster("use"),
new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT));

admin.clusters().updateCluster("usw",
new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT));
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
assertEquals(admin.clusters().getCluster("usw"),
new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT));

admin.clusters().updateCluster("usw",
new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT,
"https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS));
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use", "usw"));
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use", "usw"));
assertEquals(admin.clusters().getCluster("usw"),
new ClusterData("http://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT,
"https://new-broker.messaging.usw.example.com" + ":" + BROKER_WEBSERVICE_PORT_TLS));

admin.clusters().deleteCluster("usw");
Thread.sleep(300);

assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test", "use"));
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("use"));

admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
admin.clusters().deleteCluster("use");
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"));
assertEquals(admin.clusters().getClusters(), Lists.newArrayList());

// Check name validation
try {
Expand Down Expand Up @@ -409,9 +409,7 @@ public void brokers() throws Exception {

admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
admin.clusters().deleteCluster("use");
// "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
// failure-domain znode of this default cluster
assertEquals(admin.clusters().getClusters(), Lists.newArrayList("test"));
assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ public void testReplicationPeerCluster() throws Exception {
public void clusterFailureDomain() throws PulsarAdminException {

final String cluster = pulsar.getConfiguration().getClusterName();
admin.clusters().updateCluster(cluster,
admin.clusters().createCluster(cluster,
new ClusterData(pulsar.getWebServiceAddress(), pulsar.getWebServiceAddressTls()));
// create
FailureDomain domain = new FailureDomain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ void internalConfiguration() throws Exception {

@Test
void clusters() throws Exception {
assertEquals(clusters.getClusters(), Lists.newArrayList(configClusterName));
assertEquals(clusters.getClusters(), Lists.newArrayList());
verify(clusters, never()).validateSuperUserAccess();

clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
verify(clusters, times(1)).validateSuperUserAccess();
// ensure to read from ZooKeeper directly
clusters.clustersListCache().clear();
Expand Down Expand Up @@ -465,7 +465,7 @@ void properties() throws Exception {
assertEquals(properties.getProperties(), Lists.newArrayList());

// Create a namespace to test deleting a non-empty property
clusters.updateCluster("use", new ClusterData());
clusters.createCluster("use", new ClusterData());
newPropertyAdmin = new PropertyAdmin(Lists.newArrayList("role1", "other-role"), Sets.newHashSet("use"));
properties.createProperty("my-property", newPropertyAdmin);

Expand All @@ -492,7 +492,7 @@ void properties() throws Exception {

@Test
void brokers() throws Exception {
clusters.updateCluster("use", new ClusterData("http://broker.messaging.use.example.com",
clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com",
"https://broker.messaging.use.example.com:4443"));

URI requestUri = new URI(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void setup() throws Exception {
doNothing().when(namespaces).validateAdminAccessOnProperty("other-property");
doNothing().when(namespaces).validateAdminAccessOnProperty("new-property");

admin.clusters().updateCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
admin.clusters().createCluster("use", new ClusterData("http://broker-use.com:" + BROKER_WEBSERVICE_PORT));
admin.clusters().createCluster("usw", new ClusterData("http://broker-usw.com:" + BROKER_WEBSERVICE_PORT));
admin.clusters().createCluster("usc", new ClusterData("http://broker-usc.com:" + BROKER_WEBSERVICE_PORT));
admin.properties().createProperty(this.testProperty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void simple() throws Exception {

assertEquals(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);

admin.clusters().updateCluster("c1", new ClusterData());
admin.clusters().createCluster("c1", new ClusterData());
admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ public void testNamespaceIsolationPoliciesForPrimaryAndSecondaryBrokers() throws
final String broker1Address = pulsar1.getAdvertisedAddress() + "0";
final String broker2Address = pulsar2.getAdvertisedAddress() + "1";
final String sharedBroker = "broker3";
admin1.clusters().updateCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress()));
admin1.clusters().createCluster(cluster, new ClusterData("http://" + pulsar1.getAdvertisedAddress()));
admin1.properties().createProperty(property,
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet(cluster)));
admin1.namespaces().createNamespace(property + "/" + cluster + "/" + namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ void setup() throws Exception {
adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
admin = new PulsarAdmin(adminUrl, (Authentication) null);

admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString()));
admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
admin.properties().createProperty("prop",
new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc")));
admin.namespaces().createNamespace("prop/usc/ns-quota");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ void setup() throws Exception {
adminUrl = new URL("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT);
admin = new PulsarAdmin(adminUrl, (Authentication) null);

admin.clusters().updateCluster("usc", new ClusterData(adminUrl.toString()));
admin.clusters().createCluster("usc", new ClusterData(adminUrl.toString()));
admin.properties().createProperty("prop",
new PropertyAdmin(Lists.newArrayList("appid1"), Sets.newHashSet("usc")));
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ void setup() throws Exception {
admin3 = new PulsarAdmin(url3, (Authentication) null);

// Provision the global namespace
admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), urlTls1.toString(),
pulsar1.getBrokerServiceUrl(), pulsar1.getBrokerServiceUrlTls()));
admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), urlTls2.toString(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls()));
admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), urlTls3.toString(),
pulsar3.getBrokerServiceUrl(), pulsar3.getBrokerServiceUrlTls()));

admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public void testAnonymousSyncProducerAndConsumer(int batchMessageDelayMs) throws
authTls.configure(authParams);
internalSetup(authTls);

admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("anonymousUser"), Sets.newHashSet("use")));
Expand Down Expand Up @@ -276,7 +276,7 @@ public void testAuthenticationFilterNegative() throws Exception {
// this will cause NPE and it should throw 500
doReturn(null).when(pulsar).getGlobalZkCache();
try {
admin.clusters().updateCluster(cluster, clusterData);
admin.clusters().createCluster(cluster, clusterData);
} catch (PulsarAdminException e) {
Assert.assertTrue(e.getCause() instanceof InternalServerErrorException);
}
Expand All @@ -301,7 +301,7 @@ public void testInternalServerExceptionOnLookup() throws Exception {
authTls.configure(authParams);
internalSetup(authTls);

admin.clusters().updateCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,11 +941,11 @@ void setupReplicationCluster() throws Exception {
admin3 = new PulsarAdmin(url3, (Authentication) null);

// Provision the global namespace
admin1.clusters().updateCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
admin1.clusters().createCluster("r1", new ClusterData(url1.toString(), null, pulsar1.getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
admin1.clusters().updateCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
admin1.clusters().createCluster("r2", new ClusterData(url2.toString(), null, pulsar2.getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));
admin1.clusters().updateCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
admin1.clusters().createCluster("r3", new ClusterData(url3.toString(), null, pulsar3.getBrokerServiceUrl(),
pulsar1.getBrokerServiceUrlTls()));

admin1.clusters().createCluster("global", new ClusterData("http://global:8080"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ protected void internalSetUpForNamespace() throws Exception {
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
clientConf.setAuthentication(AuthenticationTls.class.getName(), authParams);
admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
admin.clusters().updateCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void test() throws Exception {

assertEquals(auth.canLookup(TopicName.get("persistent://p1/c1/ns1/ds1"), "my-role", null), false);

admin.clusters().updateCluster(configClusterName, new ClusterData());
admin.clusters().createCluster(configClusterName, new ClusterData());
admin.properties().createProperty("p1", new PropertyAdmin(Lists.newArrayList("role1"), Sets.newHashSet("c1")));
waitForChange();
admin.namespaces().createNamespace("p1/c1/ns1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testTlsSyncProducerAndConsumer() throws Exception {
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(authTls, proxyServiceUrl);

admin.clusters().updateCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
admin.clusters().createCluster(configClusterName, new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
Expand Down

0 comments on commit 7fa7202

Please sign in to comment.