Skip to content

Commit

Permalink
added new entrypoint for reader to websocket proxy (apache#620)
Browse files Browse the repository at this point in the history
* added new entrypoint for reader to websocket proxy

* supported non-persistent topic with websocket proxy

* added warn log in ReaderHandler

* call ReadeHandler.receiveMessage() on every readNext()

* receive ack from client in ReaderHandler
  • Loading branch information
hrsakai authored and Yuki Shiga committed Aug 23, 2017
1 parent 3ddc24d commit ccb4d92
Show file tree
Hide file tree
Showing 11 changed files with 383 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
Expand Down Expand Up @@ -283,6 +284,8 @@ public void start() throws PulsarServerException {
new ServletHolder(new WebSocketProducerServlet(webSocketService)), true);
this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH,
new ServletHolder(new WebSocketConsumerServlet(webSocketService)), true);
this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH,
new ServletHolder(new WebSocketReaderServlet(webSocketService)), true);
}

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,19 @@ protected void cleanup() throws Exception {
public void socketTest() throws Exception {
String consumerUri = "ws://localhost:" + port
+ "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/my-sub1?subscriptionType=Failover";
String readerUri = "ws://localhost:" + port + "/ws/reader/persistent/my-property/use/my-ns/my-topic1";
String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/my-property/use/my-ns/my-topic1/";

URI consumeUri = URI.create(consumerUri);
URI readUri = URI.create(readerUri);
URI produceUri = URI.create(producerUri);

WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
WebSocketClient consumeClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
WebSocketClient readClient = new WebSocketClient();
SimpleConsumerSocket readSocket = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();

Expand All @@ -120,17 +124,24 @@ public void socketTest() throws Exception {
Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
log.info("Connecting to : {}", consumeUri);

readClient.start();
ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
Future<Session> readerFuture = readClient.connect(readSocket, readUri, readRequest);
log.info("Connecting to : {}", readUri);

ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Assert.assertTrue(consumerFuture1.get().isOpen());
Assert.assertTrue(consumerFuture2.get().isOpen());
Assert.assertTrue(readerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());

int retry = 0;
int maxRetry = 400;
while (consumeSocket1.getReceivedMessagesCount() < 10 && consumeSocket2.getReceivedMessagesCount() < 10) {
while (consumeSocket1.getReceivedMessagesCount() < 10 && consumeSocket2.getReceivedMessagesCount() < 10
&& readSocket.getReceivedMessagesCount() < 10) {
Thread.sleep(10);
if (retry++ > maxRetry) {
final String msg = String.format("Consumer still has not received the message after %s ms",
Expand All @@ -150,13 +161,15 @@ public void socketTest() throws Exception {
} else {
Assert.assertEquals(produceSocket.getBuffer(), consumeSocket2.getBuffer());
}
Assert.assertEquals(produceSocket.getBuffer(), readSocket.getBuffer());
} finally {
ExecutorService executor = newFixedThreadPool(1);
try {
executor.submit(() -> {
try {
consumeClient1.stop();
consumeClient2.stop();
readClient.stop();
produceClient.stop();
log.info("proxy clients are stopped successfully");
} catch (Exception e) {
Expand Down Expand Up @@ -219,26 +232,36 @@ public void testProxyStats() throws Exception {
final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic
+ "/my-sub?subscriptionType=Failover";
final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic + "/";
final String readerUri = "ws://localhost:" + port + "/ws/reader/persistent/" + topic;
System.out.println(consumerUri+", "+producerUri);
URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);
URI readUri = URI.create(readerUri);

WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();
WebSocketClient readClient = new WebSocketClient();
SimpleConsumerSocket readSocket = new SimpleConsumerSocket();

try {
consumeClient1.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
log.info("Connecting to : {}", consumeUri);

readClient.start();
ClientUpgradeRequest readRequest = new ClientUpgradeRequest();
Future<Session> readerFuture = readClient.connect(readSocket, readUri, readRequest);
log.info("Connecting to : {}", readUri);

ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
// let it connect
Assert.assertTrue(consumerFuture1.get().isOpen());
Assert.assertTrue(readerFuture.get().isOpen());
Assert.assertTrue(producerFuture.get().isOpen());

// sleep so, proxy can deliver few messages to consumers for stats
Expand Down Expand Up @@ -317,8 +340,8 @@ private void verifyProxyStats(Client client, String baseUrl, String topic) {
Entry<String, ProxyTopicStat> entry = data.entrySet().iterator().next();
Assert.assertEquals(entry.getKey(), "persistent://" + topic);
ProxyTopicStat stats = entry.getValue();
// number of consumers are connected = 1
Assert.assertEquals(stats.consumerStats.size(), 1);
// number of consumers are connected = 2 (one is reader)
Assert.assertEquals(stats.consumerStats.size(), 2);
ConsumerStats consumerStats = stats.consumerStats.iterator().next();
// Assert.assertTrue(consumerStats.numberOfMsgDelivered > 0);
Assert.assertNotNull(consumerStats.remoteConnection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public String getTopic() {
return consumer.getTopic();
}

ConsumerImpl getConsumer() {
public ConsumerImpl getConsumer() {
return consumer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ private String extractTopicName(HttpServletRequest request) {
// /ws/producer/persistent/my-property/my-cluster/my-ns/my-topic
// or
// /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
// or
// /ws/reader/persistent/my-property/my-cluster/my-ns/my-topic
checkArgument(parts.size() >= 8, "Invalid topic name format");
checkArgument(parts.get(1).equals("ws"));
checkArgument(parts.get(3).equals("persistent"));
checkArgument(parts.get(3).equals("persistent") || parts.get(3).equals("non-persistent"));

DestinationName dn = DestinationName.get("persistent", parts.get(4), parts.get(5), parts.get(6), parts.get(7));
DestinationName dn = DestinationName.get(parts.get(3), parts.get(4), parts.get(5), parts.get(6), parts.get(7));
return dn.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ private static String extractSubscription(HttpServletRequest request) {
// /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription
checkArgument(parts.size() == 9, "Invalid topic name format");
checkArgument(parts.get(1).equals("ws"));
checkArgument(parts.get(3).equals("persistent"));
checkArgument(parts.get(3).equals("persistent")|| parts.get(3).equals("non-persistent"));
checkArgument(parts.get(8).length() > 0, "Empty subscription name");

return parts.get(8);
Expand Down
Loading

0 comments on commit ccb4d92

Please sign in to comment.