Skip to content

Commit

Permalink
Make it possible to get namespaces owned by broker when TLS is enabled (
Browse files Browse the repository at this point in the history
  • Loading branch information
massakam authored and merlimat committed Mar 6, 2018
1 parent a28c2cf commit 2c5ef2d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ protected void validateTopicName(String property, String cluster, String namespa
*/
protected void validateBrokerName(String broker) throws MalformedURLException {
String brokerUrl = String.format("http://%s", broker);
if (!pulsar().getWebServiceAddress().equals(brokerUrl)) {
String brokerUrlTls = String.format("https://%s", broker);
if (!pulsar().getWebServiceAddress().equals(brokerUrl)
&& !pulsar().getWebServiceAddressTls().equals(brokerUrlTls)) {
String[] parts = broker.split(":");
checkArgument(parts.length == 2, "Invalid broker url %s", broker);
String host = parts[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;

import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.apache.pulsar.client.admin.internal.LookupImpl;
import org.apache.pulsar.client.admin.internal.PersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.PropertiesImpl;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -108,10 +110,14 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {

private static final Logger LOG = LoggerFactory.getLogger(AdminApiTest.class);

private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";

private MockedPulsarService mockPulsarSetup;

private PulsarService otherPulsar;

private PulsarAdmin adminTls;
private PulsarAdmin otheradmin;

private NamespaceBundleFactory bundleFactory;
Expand All @@ -120,10 +126,19 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Override
public void setup() throws Exception {
conf.setLoadBalancerEnabled(true);
conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

super.internalSetup();

bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());

ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setUseTls(true);
clientConf.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
adminTls = spy(new PulsarAdmin(brokerUrlTls, clientConf));

// create otherbroker to test redirect on calls that need
// namespace ownership
mockPulsarSetup = new MockedPulsarService(this.conf);
Expand All @@ -141,6 +156,7 @@ public void setup() throws Exception {
@AfterMethod
@Override
public void cleanup() throws Exception {
adminTls.close();
super.internalCleanup();
mockPulsarSetup.cleanup();
}
Expand Down Expand Up @@ -379,6 +395,12 @@ public void brokers() throws Exception {
}
}

String[] parts = list.get(0).split(":");
Assert.assertEquals(parts.length, 2);
Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("use",
String.format("%s:%d", parts[0], BROKER_WEBSERVICE_PORT_TLS));
Assert.assertEquals(nsMap2.size(), 1);

admin.namespaces().deleteNamespace("prop-xyz/use/ns1");
admin.clusters().deleteCluster("use");
// "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
Expand Down

0 comments on commit 2c5ef2d

Please sign in to comment.