Skip to content

Commit

Permalink
[websocket] support pong in websocket (apache#10035)
Browse files Browse the repository at this point in the history
Fixes apache#10014 

### Motivation
Add "pong" logic in websocket server

### Modifications
add WebSocketPingPongServlet to apply "pong" when on ping
  • Loading branch information
linlinnn authored Mar 26, 2021
1 parent d81f507 commit 60081af
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -622,6 +623,12 @@ public Boolean get() {
new ServletHolder(readerWebSocketServlet), true, attributeMap);
this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet), true, attributeMap);

final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
this.webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
this.webService.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new ServletHolder(pingPongWebSocketServlet), true, attributeMap);
}

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -243,6 +244,12 @@ public static void addWebServerHandlers(WebServer server,
new ServletHolder(readerWebSocketServlet));
server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet));

final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
new ServletHolder(pingPongWebSocketServlet));
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new ServletHolder(pingPongWebSocketServlet));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket;

import java.io.IOException;
import java.nio.ByteBuffer;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PingPongHandler extends WebSocketAdapter implements WebSocketPingPongListener {

private static final Logger log = LoggerFactory.getLogger(PingPongHandler.class);

@Override
public void onWebSocketPing(ByteBuffer payload) {
try {
if (log.isDebugEnabled()) {
log.debug("PING: {}", BufferUtil.toDetailString(payload));
}
getRemote().sendPong(payload);
} catch (IOException e) {
log.warn("Failed to send pong: {}", e.getMessage());
}
}

@Override
public void onWebSocketPong(ByteBuffer payload) {

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket;

import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

public class WebSocketPingPongServlet extends WebSocketServlet {
private static final long serialVersionUID = 1L;

public static final String SERVLET_PATH = "/ws/pingpong";
public static final String SERVLET_PATH_V2 = "/ws/v2/pingpong";

private final transient WebSocketService service;

public WebSocketPingPongServlet(WebSocketService service) {
this.service = service;
}

@Override
public void configure(WebSocketServletFactory factory) {
factory.getPolicy().setMaxTextMessageSize(service.getConfig().getWebSocketMaxTextFrameSize());
if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) {
factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis());
}
factory.setCreator((request, response) -> new PingPongHandler());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
Expand Down Expand Up @@ -57,11 +58,12 @@ public static void start(ProxyServer proxyServer, WebSocketService service) thro
proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service));

proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH, new WebSocketPingPongServlet(service));

proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH_V2, new WebSocketProducerServlet(service));
proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH_V2, new WebSocketConsumerServlet(service));
proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH_V2, new WebSocketReaderServlet(service));
proxyServer.addWebSocketServlet(WebSocketPingPongServlet.SERVLET_PATH_V2, new WebSocketPingPongServlet(service));

proxyServer.addRestResources(ADMIN_PATH_V1, WebSocketProxyStatsV1.class.getPackage().getName(), ATTRIBUTE_PROXY_SERVICE_NAME, service);
proxyServer.addRestResources(ADMIN_PATH_V2, WebSocketProxyStatsV2.class.getPackage().getName(), ATTRIBUTE_PROXY_SERVICE_NAME, service);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.web.WebExecutorThreadPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketPingPongListener;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class PingPongHandlerTest {

private static Server server;

private static final WebExecutorThreadPool executor = new WebExecutorThreadPool(6, "pulsar-websocket-web-test");

@BeforeClass
public static void setup() throws Exception {
server = new Server(executor);
List<ServerConnector> connectors = new ArrayList<>();
ServerConnector connector = new ServerConnector(server);
connector.setPort(8080);
connectors.add(connector);
connectors.forEach(c -> c.setAcceptQueueSize(1024 / connectors.size()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));

WebSocketService service = mock(WebSocketService.class);
ServiceConfiguration config = mock(ServiceConfiguration.class);

when(service.getConfig()).thenReturn(config);
when(config.getWebSocketMaxTextFrameSize()).thenReturn(1048576);
when(config.getWebSocketSessionIdleTimeoutMillis()).thenReturn(300000);

ServletHolder servletHolder = new ServletHolder("ws-events", new WebSocketPingPongServlet(service));
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(WebSocketPingPongServlet.SERVLET_PATH);
context.addServlet(servletHolder, "/*");
server.setHandler(context);
try {
server.start();
} catch (Exception e) {
e.printStackTrace();
}
}

@AfterClass(alwaysRun = true)
public static void tearDown() throws Exception {
if (server != null) {
server.stop();
}
executor.stop();
}

@Test
public void testPingPong() throws Exception {
HttpClient httpClient = new HttpClient();
WebSocketClient webSocketClient = new WebSocketClient(httpClient);
webSocketClient.start();
MyWebSocket myWebSocket = new MyWebSocket();
String webSocketUri = "ws://localhost:8080/ws/pingpong";
Future<Session> sessionFuture = webSocketClient.connect(myWebSocket, URI.create(webSocketUri));
sessionFuture.get().getRemote().sendPing(ByteBuffer.wrap("test".getBytes()));
assertTrue(myWebSocket.getResponse().contains("test"));
}

@WebSocket
public static class MyWebSocket extends WebSocketAdapter implements WebSocketPingPongListener {

ArrayBlockingQueue<String> incomingMessages = new ArrayBlockingQueue<>(10);

@Override
public void onWebSocketClose(int i, String s) {
}

@Override
public void onWebSocketConnect(Session session) {
}

@Override
public void onWebSocketError(Throwable throwable) {
}

@Override
public void onWebSocketPing(ByteBuffer payload) {
}

@Override
public void onWebSocketPong(ByteBuffer payload) {
incomingMessages.add(BufferUtil.toDetailString(payload));
}

public String getResponse() throws InterruptedException {
return incomingMessages.take();
}
}
}

0 comments on commit 60081af

Please sign in to comment.