Skip to content

Commit

Permalink
Use binary protocol lookup for connection between WebSocket proxy and…
Browse files Browse the repository at this point in the history
… broker (apache#363)
  • Loading branch information
Yuki Shiga authored and saandrews committed Apr 19, 2017
1 parent d58cc0b commit 3b99f30
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 7 deletions.
2 changes: 2 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ globalZookeeperServers=
// Pulsar cluster url to connect to broker (optional if globalZookeeperServers present)
serviceUrl=
serviceUrlTls=
brokerServiceUrl=
brokerServiceUrlTls=

# Port to use to server HTTP request
webServicePort=8080
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ public void start() throws PulsarServerException {

if (config.isWebSocketServiceEnabled()) {
// Use local broker address to avoid different IP address when using a VIP for service discovery
this.webSocketService = new WebSocketService(new ClusterData(webServiceAddress, webServiceAddressTls),
this.webSocketService = new WebSocketService(
new ClusterData(webServiceAddress, webServiceAddressTls, brokerServiceUrl, brokerServiceUrlTls),
config);
this.webSocketService.start();
this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ public synchronized PulsarClient getPulsarClient() throws IOException {
// If not explicitly set, read clusters data from ZK
localCluster = retrieveClusterData();
}

pulsarClient = createClientInstance(localCluster);
}
return pulsarClient;
Expand All @@ -168,15 +167,23 @@ private PulsarClient createClientInstance(ClusterData clusterData) throws IOExce
config.getBrokerClientAuthenticationParameters());
}

if (config.isTlsEnabled() && !clusterData.getServiceUrlTls().isEmpty()) {
return PulsarClient.create(clusterData.getServiceUrlTls(), clientConf);
} else {
return PulsarClient.create(clusterData.getServiceUrl(), clientConf);
if (config.isTlsEnabled()) {
if (isNotBlank(clusterData.getBrokerServiceUrlTls())) {
return PulsarClient.create(clusterData.getBrokerServiceUrlTls(), clientConf);
} else if (isNotBlank(clusterData.getServiceUrlTls())) {
return PulsarClient.create(clusterData.getServiceUrlTls(), clientConf);
}
} else if (isNotBlank(clusterData.getBrokerServiceUrl())) {
return PulsarClient.create(clusterData.getBrokerServiceUrl(), clientConf);
}
return PulsarClient.create(clusterData.getServiceUrl(), clientConf);
}

private static ClusterData createClusterData(WebSocketProxyConfiguration config) {
if (isNotBlank(config.getServiceUrl()) || isNotBlank(config.getServiceUrlTls())) {
if (isNotBlank(config.getBrokerServiceUrl()) || isNotBlank(config.getBrokerServiceUrlTls())) {
return new ClusterData(config.getServiceUrl(), config.getServiceUrlTls(), config.getBrokerServiceUrl(),
config.getBrokerServiceUrlTls());
} else if (isNotBlank(config.getServiceUrl()) || isNotBlank(config.getServiceUrlTls())) {
return new ClusterData(config.getServiceUrl(), config.getServiceUrlTls());
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
// Pulsar cluster url to connect to broker (optional if globalZookeeperServers present)
private String serviceUrl;
private String serviceUrlTls;
private String brokerServiceUrl;
private String brokerServiceUrlTls;

// Global Zookeeper quorum connection string
private String globalZookeeperServers;
Expand Down Expand Up @@ -96,6 +98,22 @@ public void setServiceUrlTls(String serviceUrlTls) {
this.serviceUrlTls = serviceUrlTls;
}

public String getBrokerServiceUrl() {
return brokerServiceUrl;
}

public void setBrokerServiceUrl(String brokerServiceUrl) {
this.brokerServiceUrl = brokerServiceUrl;
}

public String getBrokerServiceUrlTls() {
return brokerServiceUrlTls;
}

public void setBrokerServiceUrlTls(String brokerServiceUrlTls) {
this.brokerServiceUrlTls = brokerServiceUrlTls;
}

public String getGlobalZookeeperServers() {
return globalZookeeperServers;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.yahoo.pulsar.websocket;

import java.lang.reflect.Field;

import org.testng.Assert;
import org.testng.annotations.Test;

import com.yahoo.pulsar.websocket.service.WebSocketProxyConfiguration;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;

public class LookupProtocolTest {
@Test
public void httpLookupTest() throws Exception{
WebSocketProxyConfiguration conf = new WebSocketProxyConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setServiceUrlTls("https://localhost:8443");
WebSocketService service = new WebSocketService(conf);
PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient();
Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
lookupField.setAccessible(true);
Assert.assertEquals(lookupField.get(testClient).getClass().getName(), "com.yahoo.pulsar.client.impl.HttpLookupService");
Assert.assertFalse(testClient.getConfiguration().isUseTls());
}

@Test
public void httpsLookupTest() throws Exception{
WebSocketProxyConfiguration conf = new WebSocketProxyConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setServiceUrlTls("https://localhost:8443");
conf.setBrokerServiceUrl("pulsar://localhost:6650");
conf.setTlsEnabled(true);
WebSocketService service = new WebSocketService(conf);
PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient();
Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
lookupField.setAccessible(true);
Assert.assertEquals(lookupField.get(testClient).getClass().getName(), "com.yahoo.pulsar.client.impl.HttpLookupService");
Assert.assertTrue(testClient.getConfiguration().isUseTls());
}

@Test
public void binaryLookupTest() throws Exception{
WebSocketProxyConfiguration conf = new WebSocketProxyConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setServiceUrlTls("https://localhost:8443");
conf.setBrokerServiceUrl("pulsar://localhost:6650");
conf.setBrokerServiceUrlTls("pulsar+ssl://localhost:6651");
WebSocketService service = new WebSocketService(conf);
PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient();
Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
lookupField.setAccessible(true);
Assert.assertEquals(lookupField.get(testClient).getClass().getName(), "com.yahoo.pulsar.client.impl.BinaryProtoLookupService");
Assert.assertFalse(testClient.getConfiguration().isUseTls());
}

@Test
public void binaryTlsLookupTest() throws Exception{
WebSocketProxyConfiguration conf = new WebSocketProxyConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setServiceUrlTls("https://localhost:8443");
conf.setBrokerServiceUrl("pulsar://localhost:6650");
conf.setBrokerServiceUrlTls("pulsar+ssl://localhost:6651");
conf.setTlsEnabled(true);
WebSocketService service = new WebSocketService(conf);
PulsarClientImpl testClient = (PulsarClientImpl) service.getPulsarClient();
Field lookupField = PulsarClientImpl.class.getDeclaredField("lookup");
lookupField.setAccessible(true);
Assert.assertEquals(lookupField.get(testClient).getClass().getName(), "com.yahoo.pulsar.client.impl.BinaryProtoLookupService");
Assert.assertTrue(testClient.getConfiguration().isUseTls());
}
}

0 comments on commit 3b99f30

Please sign in to comment.