Skip to content

Commit

Permalink
Fix ProxyPublishConsumeTlsTest (apache#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
nkurihar authored and merlimat committed Sep 29, 2017
1 parent 6010db0 commit fe28023
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ protected final void init() throws Exception {
protected final void internalCleanup() throws Exception {
try {
admin.close();
pulsarClient.close();
// There are some test cases where pulsarClient is not initialized.
if (pulsarClient != null) {
pulsarClient.close();
}
pulsar.close();
mockBookKeeper.reallyShutdow();
mockZookKeeper.shutdown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.mockito.Mockito.spy;

import java.net.URI;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

public class TlsProducerConsumerBase extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TlsProducerConsumerBase.class);

protected final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
protected final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
protected final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";

@BeforeMethod
@Override
protected void setup() throws Exception {

// TLS configuration for Broker
internalSetUpForBroker();

// Start Broker
super.init();
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}

protected void internalSetUpForBroker() throws Exception {
conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setClusterName("use");
}

protected void internalSetUpForClient() throws Exception {
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
clientConf.setUseTls(true);
String lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString();
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
}

protected void internalSetUpForNamespace() throws Exception {
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
clientConf.setUseTls(true);
admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
admin.clusters().createCluster("use",
new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(), "pulsar://localhost:" + BROKER_PORT,
"pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,94 +18,37 @@
*/
package org.apache.pulsar.client.api;

import static org.mockito.Mockito.spy;

import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

public class TlsProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(AuthenticatedProducerConsumerTest.class);

private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";

@BeforeMethod
@Override
protected void setup() throws Exception {

conf.setTlsEnabled(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);

conf.setClusterName("use");

super.init();
}

protected final void internalSetupForTls() throws Exception {

org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
clientConf.setUseTls(true);

admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
String lookupUrl = new URI("pulsar+ssl://localhost:" + BROKER_PORT_TLS).toString();
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
}

@AfterMethod
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public class TlsProducerConsumerTest extends TlsProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(TlsProducerConsumerTest.class);

/**
* verifies that messages whose size is larger than 2^14 bytes (max size of single TLS chunk) can be
* produced/consumed
*
*
* @throws Exception
*/
@Test
@Test(timeOut = 30000)
public void testTlsLargeSizeMessage() throws Exception {
log.info("-- Starting {} test --", methodName);

final int MESSAGE_SIZE = 16 * 1024 + 1;
log.info("-- message size --", MESSAGE_SIZE);

internalSetupForTls();

admin.clusters().createCluster("use", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("use")));
admin.namespaces().createNamespace("my-property/use/my-ns");
internalSetUpForClient();
internalSetUpForNamespace();

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = pulsarClient.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name",
conf);
Consumer consumer = pulsarClient
.subscribe("persistent://my-property/use/my-ns/my-topic1", "my-subscriber-name", conf);

ProducerConfiguration producerConf = new ProducerConfiguration();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,14 @@
import static org.mockito.Mockito.spy;

import java.net.URI;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.GeneralSecurityException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;

import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.client.api.TlsProducerConsumerTest;
import org.apache.pulsar.client.api.TlsProducerConsumerBase;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
Expand All @@ -51,23 +46,18 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

public class ProxyPublishConsumeTls extends TlsProducerConsumerTest {
public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {
protected String methodName;
private int port;
private int tlsPort;
private static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
private static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";

private ProxyServer proxyServer;
private WebSocketService service;

@BeforeMethod
public void setup() throws Exception {
super.setup();

this.internalSetupForTls();
super.internalSetUpForNamespace();

port = PortManager.nextFreePort();
tlsPort = PortManager.nextFreePort();
Expand All @@ -77,6 +67,7 @@ public void setup() throws Exception {
config.setTlsEnabled(true);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
config.setClusterName("use");
config.setGlobalZookeeperServers("dummy-zk-servers");
service = spy(new WebSocketService(config));
Expand All @@ -95,20 +86,17 @@ protected void cleanup() throws Exception {

}

@Test(timeOut=10000)
public void socketTest() throws InterruptedException, NoSuchAlgorithmException, KeyManagementException {
String consumerUri = "wss://localhost:" + tlsPort + "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
@Test(timeOut = 30000)
public void socketTest() throws InterruptedException, GeneralSecurityException {
String consumerUri =
"wss://localhost:" + tlsPort + "/ws/consumer/persistent/my-property/use/my-ns/my-topic/my-sub";
String producerUri = "wss://localhost:" + tlsPort + "/ws/producer/persistent/my-property/use/my-ns/my-topic/";
URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);

KeyManager[] keyManagers = null;
TrustManager[] trustManagers = InsecureTrustManagerFactory.INSTANCE.getTrustManagers();
SSLContext sslCtx = SSLContext.getInstance("TLS");
sslCtx.init(keyManagers, trustManagers, new SecureRandom());

SslContextFactory sslContextFactory = new SslContextFactory();
sslContextFactory.setSslContext(sslCtx);
sslContextFactory.setSslContext(SecurityUtility
.createSslContext(false, SecurityUtility.loadCertificatesFromPemFile(TLS_TRUST_CERT_FILE_PATH)));

WebSocketClient consumeClient = new WebSocketClient(sslContextFactory);
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
Expand All @@ -134,6 +122,7 @@ public void socketTest() throws InterruptedException, NoSuchAlgorithmException,
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer());
} catch (Throwable t) {
log.error(t.getMessage());
Assert.fail(t.getMessage());
} finally {
ExecutorService executor = newFixedThreadPool(1);
try {
Expand All @@ -153,5 +142,5 @@ public void socketTest() throws InterruptedException, NoSuchAlgorithmException,
}
}

private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTls.class);
private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTlsTest.class);
}

0 comments on commit fe28023

Please sign in to comment.