Skip to content

Commit

Permalink
Ensure producer/consumer are creating before upgrading from HTTP to W…
Browse files Browse the repository at this point in the history
…ebSocket (apache#779)

* Ensure producer/consumer are creating before upgrading from HTTP to WebSocket

* Addressed PR comments
  • Loading branch information
merlimat authored and rdhabalia committed Sep 21, 2017
1 parent 696c4a9 commit 577b897
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import javax.net.ssl.TrustManager;

import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.TlsProducerConsumerTest;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.websocket.service.ProxyServer;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
Expand All @@ -53,7 +53,7 @@

import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

public class ProxyPublishConsumeTls extends ProducerConsumerBase {
public class ProxyPublishConsumeTls extends TlsProducerConsumerTest {
protected String methodName;
private int port;
private int tlsPort;
Expand All @@ -65,8 +65,9 @@ public class ProxyPublishConsumeTls extends ProducerConsumerBase {

@BeforeMethod
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
super.setup();

this.internalSetupForTls();

port = PortManager.nextFreePort();
tlsPort = PortManager.nextFreePort();
Expand All @@ -87,7 +88,7 @@ public void setup() throws Exception {

@AfterMethod
protected void cleanup() throws Exception {
super.internalCleanup();
super.cleanup();
service.close();
proxyServer.stop();
log.info("Finished Cleaning Up Test setup");
Expand All @@ -112,7 +113,6 @@ public void socketTest() throws InterruptedException, NoSuchAlgorithmException,
WebSocketClient consumeClient = new WebSocketClient(sslContextFactory);
SimpleConsumerSocket consumeSocket = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient(sslContextFactory);
SimpleProducerSocket produceSocket = new SimpleProducerSocket();

try {
consumeClient.start();
Expand All @@ -121,8 +121,7 @@ public void socketTest() throws InterruptedException, NoSuchAlgorithmException,
log.info("Connecting to : {}", consumeUri);
Assert.assertTrue(consumerFuture.get().isOpen());

Thread.sleep(500);

SimpleProducerSocket produceSocket = new SimpleProducerSocket();
ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;

import javax.naming.AuthenticationException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.pulsar.common.naming.DestinationName;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,7 +47,7 @@ public abstract class AbstractWebSocketHandler extends WebSocketAdapter implemen
protected final String topic;
protected final Map<String, String> queryParams;

public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request) {
public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
this.service = service;
this.request = request;
this.topic = extractTopicName(request);
Expand All @@ -55,44 +56,58 @@ public AbstractWebSocketHandler(WebSocketService service, HttpServletRequest req
request.getParameterMap().forEach((key, values) -> {
queryParams.put(key, values[0]);
});
}

@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
log.info("[{}] New WebSocket session on topic {}", session.getRemoteAddress(), topic);
checkAuth(response);
}

private void checkAuth(ServletUpgradeResponse response) {
String authRole = "<none>";
if (service.isAuthenticationEnabled()) {
try {
authRole = service.getAuthenticationService().authenticateHttpRequest(request);
log.info("[{}] Authenticated WebSocket client {} on topic {}", session.getRemoteAddress(), authRole,
topic);
log.info("[{}:{}] Authenticated WebSocket client {} on topic {}", request.getRemoteAddr(),
request.getRemotePort(), authRole, topic);

} catch (AuthenticationException e) {
log.warn("[{}] Failed to authenticated WebSocket client {} on topic {}: {}",
session.getRemoteAddress(), authRole, topic, e.getMessage());
close(WebSocketError.AuthenticationError);
log.warn("[{}:{}] Failed to authenticated WebSocket client {} on topic {}: {}", request.getRemoteAddr(),
request.getRemotePort(), authRole, topic, e.getMessage());
try {
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Failed to authenticate");
} catch (IOException e1) {
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
e1.getMessage(), e1);
}
return;
}
}

if (service.isAuthorizationEnabled()) {
try {
if (!isAuthorized(authRole)) {
log.warn("[{}] WebSocket Client [{}] is not authorized on topic {}", session.getRemoteAddress(), authRole,
topic);
close(WebSocketError.NotAuthorizedError);
log.warn("[{}:{}] WebSocket Client [{}] is not authorized on topic {}", request.getRemoteAddr(),
request.getRemotePort(), authRole, topic);
response.sendError(HttpServletResponse.SC_FORBIDDEN, "Not authorized");
return;
}
} catch (Exception e) {
log.warn("[{}] Got an exception when authorizing WebSocket client {} on topic {} on: {}",
session.getRemoteAddress(), authRole, topic, e.getMessage());
close(WebSocketError.UnknownError);
log.warn("[{}:{}] Got an exception when authorizing WebSocket client {} on topic {} on: {}",
request.getRemoteAddr(), request.getRemotePort(), authRole, topic, e.getMessage());
try {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Server error");
} catch (IOException e1) {
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
e1.getMessage(), e1);
}
return;
}
}
createClient(session);

}

@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
log.info("[{}] New WebSocket session on topic {}", session.getRemoteAddress(), topic);
}

@Override
Expand Down Expand Up @@ -153,7 +168,5 @@ private String extractTopicName(HttpServletRequest request) {

protected abstract Boolean isAuthorized(String authRole) throws Exception;

protected abstract void createClient(Session session);

private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketHandler.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
Expand All @@ -42,8 +42,8 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -69,35 +69,36 @@ public class ConsumerHandler extends AbstractWebSocketHandler {

private final int maxPendingMessages;
private final AtomicInteger pendingMessages = new AtomicInteger();

private final LongAdder numMsgsDelivered;
private final LongAdder numBytesDelivered;
private final LongAdder numMsgsAcked;
private volatile long msgDeliveredCounter = 0;
private static final AtomicLongFieldUpdater<ConsumerHandler> MSG_DELIVERED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(ConsumerHandler.class, "msgDeliveredCounter");

public ConsumerHandler(WebSocketService service, HttpServletRequest request) {
super(service, request);
public ConsumerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
this.subscription = extractSubscription(request);
this.conf = getConsumerConfiguration();
this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : conf.getReceiverQueueSize();
this.numMsgsDelivered = new LongAdder();
this.numBytesDelivered = new LongAdder();
this.numMsgsAcked = new LongAdder();

}

@Override
protected void createClient(Session session) {
try {
this.consumer = service.getPulsarClient().subscribe(topic, subscription, conf);
this.service.addConsumer(this);
receiveMessage();
} catch (Exception e) {
log.warn("[{}] Failed in creating subscription {} on topic {}", session.getRemoteAddress(), subscription,
topic, e);
close(WebSocketError.FailedToSubscribe, e.getMessage());
log.warn("[{}:{}] Failed in creating subscription {} on topic {}", request.getRemoteAddr(),
request.getRemotePort(), subscription, topic, e);
try {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to subscribe");
} catch (IOException e1) {
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
e1.getMessage(), e1);
}
}
}

Expand Down Expand Up @@ -224,7 +225,7 @@ public long getAndResetNumMsgsAcked() {
public long getMsgDeliveredCounter() {
return MSG_DELIVERED_COUNTER_UPDATER.get(this);
}

protected void updateDeliverMsgStat(long msgSize) {
numMsgsDelivered.increment();
MSG_DELIVERED_COUNTER_UPDATER.incrementAndGet(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.LongAdder;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
Expand All @@ -45,6 +46,8 @@
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.apache.pulsar.websocket.stats.StatsBuckets;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -70,16 +73,31 @@ public class ProducerHandler extends AbstractWebSocketHandler {
private volatile long msgPublishedCounter = 0;
private static final AtomicLongFieldUpdater<ProducerHandler> MSG_PUBLISHED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(ProducerHandler.class, "msgPublishedCounter");

public static final long[] ENTRY_LATENCY_BUCKETS_USEC = { 500, 1_000, 5_000, 10_000, 20_000, 50_000, 100_000,
200_000, 1000_000 };

public ProducerHandler(WebSocketService service, HttpServletRequest request) {
super(service, request);
public ProducerHandler(WebSocketService service, HttpServletRequest request, ServletUpgradeResponse response) {
super(service, request, response);
this.numMsgsSent = new LongAdder();
this.numBytesSent = new LongAdder();
this.numMsgsFailed = new LongAdder();
this.publishLatencyStatsUSec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);

try {
ProducerConfiguration conf = getProducerConfiguration();
this.producer = service.getPulsarClient().createProducer(topic, conf);
this.service.addProducer(this);
} catch (Exception e) {
log.warn("[{}:{}] Failed in creating producer on topic {}", request.getRemoteAddr(),
request.getRemotePort(), topic, e);
try {
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Failed to create producer");
} catch (IOException e1) {
log.warn("[{}:{}] Failed to send error: {}", request.getRemoteAddr(), request.getRemotePort(),
e1.getMessage(), e1);
}
}
}

@Override
Expand All @@ -97,19 +115,6 @@ public void close() throws IOException {
}
}

@Override
protected void createClient(Session session) {
try {
ProducerConfiguration conf = getProducerConfiguration();
this.producer = service.getPulsarClient().createProducer(topic, conf);
this.service.addProducer(this);
} catch (Exception e) {
log.warn("[{}] Failed in creating producer on topic {}", session.getRemoteAddress(),
topic, e);
close(FailedToCreateProducer, e.getMessage());
}
}

@Override
public void onWebSocketText(String message) {
ProducerMessage sendRequest;
Expand Down Expand Up @@ -201,7 +206,7 @@ private void sendAckResponse(ProducerAck response) {
log.warn("[{}] Failed to send ack {}", producer.getTopic(), e.getMessage(), e);
}
}

private void updateSentMsgStats(long msgSize, long latencyUsec) {
this.publishLatencyStatsUSec.addValue(latencyUsec);
this.numBytesSent.add(msgSize);
Expand All @@ -214,7 +219,7 @@ private ProducerConfiguration getProducerConfiguration() {

// Set to false to prevent the server thread from being blocked if a lot of messages are pending.
conf.setBlockIfQueueFull(false);

if (queryParams.containsKey("sendTimeoutMillis")) {
conf.setSendTimeout(Integer.parseInt(queryParams.get("sendTimeoutMillis")), TimeUnit.MILLISECONDS);
}
Expand Down
Loading

0 comments on commit 577b897

Please sign in to comment.