Skip to content

Commit

Permalink
Support enable WebSocket on Pulsar Proxy. (apache#8613)
Browse files Browse the repository at this point in the history
### Motivation

Support enable WebSocket on Pulsar Proxy.

### Verifying this change

Integration tests added.
  • Loading branch information
codelipenghui authored Nov 21, 2020
1 parent a1cf30a commit 19767c7
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 14 deletions.
20 changes: 18 additions & 2 deletions .github/workflows/ci-integration-messaging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,26 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn -q -B -ntp clean install -DskipTests

- name: build pulsar image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build pulsar-all image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: build artifacts and docker image
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests
run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true

- name: run integration tests
- name: run integration messaging tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-messaging.xml -DintegrationTests -DredirectTestOutputToFile=false

- name: run integration proxy tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-proxy.xml -DintegrationTests -DredirectTestOutputToFile=false

- name: run integration proxy with WebSocket tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-proxy-websocket.xml -DintegrationTests -DredirectTestOutputToFile=false
8 changes: 8 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ tokenAudienceClaim=
# The token audience stands for this broker. The field `tokenAudienceClaim` of a valid token, need contains this.
tokenAudience=

### --- WebSocket config variables --- ###

# Enable or disable the WebSocket servlet.
webSocketServiceEnabled=false

# Name of the cluster to which this broker belongs to
clusterName=

### --- Deprecated config variables --- ###

# Deprecated. Use configurationStoreServers
Expand Down
6 changes: 6 additions & 0 deletions pulsar-proxy/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-websocket</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
private static final String CATEGORY_SASL_AUTH = "SASL Authentication Provider";
@Category
private static final String CATEGORY_PLUGIN = "proxy plugin";
@Category
private static final String CATEGORY_WEBSOCKET = "WebSocket";

@FieldContext(
category = CATEGORY_BROKER_DISCOVERY,
Expand Down Expand Up @@ -528,6 +530,20 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
}
)

/***** --- WebSocket --- ****/
@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Enable or disable the WebSocket servlet"
)
private boolean webSocketServiceEnabled = false;

@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Name of the cluster to which this broker belongs to"
)
private String clusterName;

private Properties properties = new Properties();

public Properties getProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServletWithClassLoader;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
import org.eclipse.jetty.proxy.ProxyServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -184,7 +189,7 @@ public static void main(String[] args) throws Exception {
public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) {
BrokerDiscoveryProvider discoveryProvider) throws Exception {
server.addServlet("/metrics", new ServletHolder(MetricsServlet.class), Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
server.addRestResources("/", VipStatus.class.getPackage().getName(),
VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
Expand Down Expand Up @@ -215,6 +220,30 @@ public static void addWebServerHandlers(WebServer server,
log.info("proxy add additional servlet basePath {} ", servletWithClassLoader.getBasePath());
}
}

if (config.isWebSocketServiceEnabled()) {
// add WebSocket servlet
// Use local broker address to avoid different IP address when using a VIP for service discovery
WebSocketService webSocketService = new WebSocketService(null, PulsarConfigurationLoader.convertFrom(config));
webSocketService.start();
final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
new ServletHolder(producerWebSocketServlet));
server.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
new ServletHolder(producerWebSocketServlet));

final WebSocketServlet consumerWebSocketServlet = new WebSocketConsumerServlet(webSocketService);
server.addServlet(WebSocketConsumerServlet.SERVLET_PATH,
new ServletHolder(consumerWebSocketServlet));
server.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
new ServletHolder(consumerWebSocketServlet));

final WebSocketServlet readerWebSocketServlet = new WebSocketReaderServlet(webSocketService);
server.addServlet(WebSocketReaderServlet.SERVLET_PATH,
new ServletHolder(readerWebSocketServlet));
server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet));
}
}

private static final Logger log = LoggerFactory.getLogger(ProxyServiceStarter.class);
Expand Down
5 changes: 5 additions & 0 deletions tests/integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.tests.integration.containers.ProxyContainer;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.Test;
Expand All @@ -44,19 +43,11 @@
*/
public class TestProxy extends PulsarTestSuite {
private final static Logger log = LoggerFactory.getLogger(TestProxy.class);
private ProxyContainer proxyViaURL;

@Override
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
String clusterName,
PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
proxyViaURL = new ProxyContainer(clusterName, "proxy-via-url")
.withEnv("brokerServiceURL", "pulsar://pulsar-broker-0:6650")
.withEnv("brokerWebServiceURL", "http://pulsar-broker-0:8080")
.withEnv("clusterName", clusterName);

specBuilder.externalService("proxy-via-url", proxyViaURL);

return super.beforeSetupCluster(clusterName, specBuilder);
}

Expand Down Expand Up @@ -107,7 +98,7 @@ public void testProxyWithServiceDiscovery() throws Exception {

@Test
public void testProxyWithNoServiceDiscoveryProxyConnectsViaURL() throws Exception {
testProxy(proxyViaURL.getPlainTextServiceUrl(), proxyViaURL.getHttpServiceUrl());
testProxy(pulsarCluster.getProxy().getPlainTextServiceUrl(), pulsarCluster.getProxy().getHttpServiceUrl());
}

@Test
Expand All @@ -119,7 +110,7 @@ public void testProxyRequestBodyRedirect() throws Exception {

@Cleanup
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(pulsarCluster.getPlainTextServiceUrl())
.serviceHttpUrl(pulsarCluster.getProxy().getHttpServiceUrl())
.build();

admin.tenants().createTenant(tenant,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.proxy;

import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.tests.integration.containers.CSContainer;
import org.apache.pulsar.tests.integration.containers.ProxyContainer;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.awaitility.Awaitility;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.apache.pulsar.tests.integration.containers.PulsarContainer.CS_PORT;

/**
* Test cases for proxy.
*/
public class TestProxyWithWebSocket extends PulsarTestSuite {
private final static Logger log = LoggerFactory.getLogger(TestProxyWithWebSocket.class);

@Override
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
String clusterName,
PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
Map<String, String> envs = new HashMap<>();
envs.put("webSocketServiceEnabled", "true");
specBuilder.proxyEnvs(envs);
return super.beforeSetupCluster(clusterName, specBuilder);
}

@Test
public void testWebSocket() throws Exception {

final String tenant = "proxy-test-" + randomName(10);
final String namespace = tenant + "/ns1";

@Cleanup
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(pulsarCluster.getHttpServiceUrl())
.build();

admin.tenants().createTenant(tenant,
new TenantInfo(Collections.emptySet(), Collections.singleton(pulsarCluster.getClusterName())));

admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName()));

HttpClient httpClient = new HttpClient();
WebSocketClient webSocketClient = new WebSocketClient(httpClient);
webSocketClient.start();
MyWebSocket myWebSocket = new MyWebSocket();
String webSocketUri = pulsarCluster.getProxy().getHttpServiceUrl().replaceFirst("http", "ws")
+ "/ws/v2/producer/persistent/" + namespace + "/my-topic";
Future<Session> sessionFuture = webSocketClient.connect(myWebSocket,
URI.create(webSocketUri));
sessionFuture.get().getRemote().sendString("{\n" +
" \"payload\": \"SGVsbG8gV29ybGQ=\",\n" +
" \"properties\": {\"key1\": \"value1\", \"key2\": \"value2\"},\n" +
" \"context\": \"1\"\n" +
"}");

Awaitility.await().atMost(3, TimeUnit.SECONDS).untilAsserted(() -> {
String response = myWebSocket.getResponse();
Assert.assertNotNull(response);
Assert.assertTrue(response.contains("ok"));
});
}

@WebSocket
public static class MyWebSocket implements WebSocketListener {
Queue<String> incomingMessages = new ArrayBlockingQueue<>(10);
@Override
public void onWebSocketBinary(byte[] bytes, int i, int i1) {
}

@Override
public void onWebSocketText(String s) {
incomingMessages.add(s);
}

@Override
public void onWebSocketClose(int i, String s) {
}

@Override
public void onWebSocketConnect(Session session) {

}

@Override
public void onWebSocketError(Throwable throwable) {

}

public String getResponse() {
return incomingMessages.poll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ private PulsarCluster(PulsarClusterSpec spec) {
.withEnv("zookeeperServers", ZKContainer.NAME)
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName);
if (spec.proxyEnvs != null) {
spec.proxyEnvs.forEach(this.proxyContainer::withEnv);
}

// create bookies
bookieContainers.putAll(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,9 @@ public class PulsarClusterSpec {
*/
@Default
String pulsarTestImage = PulsarContainer.DEFAULT_IMAGE_NAME;

/**
* Specify envs for proxy.
*/
Map<String, String> proxyEnvs;
}
28 changes: 28 additions & 0 deletions tests/integration/src/test/resources/pulsar-proxy-websocket.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd" >
<suite name="Pulsar (Proxy with WebSocket) Integration Tests" verbose="2" annotations="JDK">
<test name="messaging-test-suite" preserve-order="true">
<classes>
<class name="org.apache.pulsar.tests.integration.proxy.TestProxyWithWebSocket" />
</classes>
</test>
</suite>
Loading

0 comments on commit 19767c7

Please sign in to comment.