From ccb4d92f25c9046464959cd26a66696e98dc9522 Mon Sep 17 00:00:00 2001 From: hrsakai Date: Wed, 23 Aug 2017 09:34:21 +0900 Subject: [PATCH] added new entrypoint for reader to websocket proxy (#620) * added new entrypoint for reader to websocket proxy * supported non-persistent topic with websocket proxy * added warn log in ReaderHandler * call ReadeHandler.receiveMessage() on every readNext() * receive ack from client in ReaderHandler --- .../apache/pulsar/broker/PulsarService.java | 3 + .../proxy/ProxyPublishConsumeTest.java | 29 +- .../apache/pulsar/client/impl/ReaderImpl.java | 2 +- .../websocket/AbstractWebSocketHandler.java | 6 +- .../pulsar/websocket/ConsumerHandler.java | 2 +- .../pulsar/websocket/ReaderHandler.java | 252 ++++++++++++++++++ .../websocket/WebSocketReaderServlet.java | 42 +++ .../pulsar/websocket/WebSocketService.java | 19 ++ .../websocket/admin/WebSocketProxyStats.java | 33 ++- .../service/WebSocketServiceStarter.java | 2 + .../websocket/stats/ProxyTopicStat.java | 8 + 11 files changed, 383 insertions(+), 15 deletions(-) create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java create mode 100644 pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index db88c526148b3..5ac670eee1719 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -58,6 +58,7 @@ import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; +import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; import org.apache.pulsar.zookeeper.GlobalZooKeeperCache; import org.apache.pulsar.zookeeper.LocalZooKeeperCache; @@ -283,6 +284,8 @@ public void start() throws PulsarServerException { new ServletHolder(new WebSocketProducerServlet(webSocketService)), true); this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH, new ServletHolder(new WebSocketConsumerServlet(webSocketService)), true); + this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH, + new ServletHolder(new WebSocketReaderServlet(webSocketService)), true); } if (LOG.isDebugEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index de281ac3f9578..2322f287357c4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -99,15 +99,19 @@ protected void cleanup() throws Exception { public void socketTest() throws Exception { String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/my-property/use/my-ns/my-topic1/my-sub1?subscriptionType=Failover"; + String readerUri = "ws://localhost:" + port + "/ws/reader/persistent/my-property/use/my-ns/my-topic1"; String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/my-property/use/my-ns/my-topic1/"; URI consumeUri = URI.create(consumerUri); + URI readUri = URI.create(readerUri); URI produceUri = URI.create(producerUri); WebSocketClient consumeClient1 = new WebSocketClient(); SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket(); WebSocketClient consumeClient2 = new WebSocketClient(); SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket(); + WebSocketClient readClient = new WebSocketClient(); + SimpleConsumerSocket readSocket = new SimpleConsumerSocket(); WebSocketClient produceClient = new WebSocketClient(); SimpleProducerSocket produceSocket = new SimpleProducerSocket(); @@ -120,17 +124,24 @@ public void socketTest() throws Exception { Future consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2); log.info("Connecting to : {}", consumeUri); + readClient.start(); + ClientUpgradeRequest readRequest = new ClientUpgradeRequest(); + Future readerFuture = readClient.connect(readSocket, readUri, readRequest); + log.info("Connecting to : {}", readUri); + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); produceClient.start(); Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); // let it connect Assert.assertTrue(consumerFuture1.get().isOpen()); Assert.assertTrue(consumerFuture2.get().isOpen()); + Assert.assertTrue(readerFuture.get().isOpen()); Assert.assertTrue(producerFuture.get().isOpen()); int retry = 0; int maxRetry = 400; - while (consumeSocket1.getReceivedMessagesCount() < 10 && consumeSocket2.getReceivedMessagesCount() < 10) { + while (consumeSocket1.getReceivedMessagesCount() < 10 && consumeSocket2.getReceivedMessagesCount() < 10 + && readSocket.getReceivedMessagesCount() < 10) { Thread.sleep(10); if (retry++ > maxRetry) { final String msg = String.format("Consumer still has not received the message after %s ms", @@ -150,6 +161,7 @@ public void socketTest() throws Exception { } else { Assert.assertEquals(produceSocket.getBuffer(), consumeSocket2.getBuffer()); } + Assert.assertEquals(produceSocket.getBuffer(), readSocket.getBuffer()); } finally { ExecutorService executor = newFixedThreadPool(1); try { @@ -157,6 +169,7 @@ public void socketTest() throws Exception { try { consumeClient1.stop(); consumeClient2.stop(); + readClient.stop(); produceClient.stop(); log.info("proxy clients are stopped successfully"); } catch (Exception e) { @@ -219,14 +232,18 @@ public void testProxyStats() throws Exception { final String consumerUri = "ws://localhost:" + port + "/ws/consumer/persistent/" + topic + "/my-sub?subscriptionType=Failover"; final String producerUri = "ws://localhost:" + port + "/ws/producer/persistent/" + topic + "/"; + final String readerUri = "ws://localhost:" + port + "/ws/reader/persistent/" + topic; System.out.println(consumerUri+", "+producerUri); URI consumeUri = URI.create(consumerUri); URI produceUri = URI.create(producerUri); + URI readUri = URI.create(readerUri); WebSocketClient consumeClient1 = new WebSocketClient(); SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket(); WebSocketClient produceClient = new WebSocketClient(); SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + WebSocketClient readClient = new WebSocketClient(); + SimpleConsumerSocket readSocket = new SimpleConsumerSocket(); try { consumeClient1.start(); @@ -234,11 +251,17 @@ public void testProxyStats() throws Exception { Future consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1); log.info("Connecting to : {}", consumeUri); + readClient.start(); + ClientUpgradeRequest readRequest = new ClientUpgradeRequest(); + Future readerFuture = readClient.connect(readSocket, readUri, readRequest); + log.info("Connecting to : {}", readUri); + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); produceClient.start(); Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); // let it connect Assert.assertTrue(consumerFuture1.get().isOpen()); + Assert.assertTrue(readerFuture.get().isOpen()); Assert.assertTrue(producerFuture.get().isOpen()); // sleep so, proxy can deliver few messages to consumers for stats @@ -317,8 +340,8 @@ private void verifyProxyStats(Client client, String baseUrl, String topic) { Entry entry = data.entrySet().iterator().next(); Assert.assertEquals(entry.getKey(), "persistent://" + topic); ProxyTopicStat stats = entry.getValue(); - // number of consumers are connected = 1 - Assert.assertEquals(stats.consumerStats.size(), 1); + // number of consumers are connected = 2 (one is reader) + Assert.assertEquals(stats.consumerStats.size(), 2); ConsumerStats consumerStats = stats.consumerStats.iterator().next(); // Assert.assertTrue(consumerStats.numberOfMsgDelivered > 0); Assert.assertNotNull(consumerStats.remoteConnection); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 5b7dc011011eb..fc3a64fe3ab5e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -81,7 +81,7 @@ public String getTopic() { return consumer.getTopic(); } - ConsumerImpl getConsumer() { + public ConsumerImpl getConsumer() { return consumer; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java index 0c9e4d8097de4..7c6763d321b9b 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/AbstractWebSocketHandler.java @@ -141,11 +141,13 @@ private String extractTopicName(HttpServletRequest request) { // /ws/producer/persistent/my-property/my-cluster/my-ns/my-topic // or // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription + // or + // /ws/reader/persistent/my-property/my-cluster/my-ns/my-topic checkArgument(parts.size() >= 8, "Invalid topic name format"); checkArgument(parts.get(1).equals("ws")); - checkArgument(parts.get(3).equals("persistent")); + checkArgument(parts.get(3).equals("persistent") || parts.get(3).equals("non-persistent")); - DestinationName dn = DestinationName.get("persistent", parts.get(4), parts.get(5), parts.get(6), parts.get(7)); + DestinationName dn = DestinationName.get(parts.get(3), parts.get(4), parts.get(5), parts.get(6), parts.get(7)); return dn.toString(); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index d748fb15cc494..2c43531ccd610 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -266,7 +266,7 @@ private static String extractSubscription(HttpServletRequest request) { // /ws/consumer/persistent/my-property/my-cluster/my-ns/my-topic/my-subscription checkArgument(parts.size() == 9, "Invalid topic name format"); checkArgument(parts.get(1).equals("ws")); - checkArgument(parts.get(3).equals("persistent")); + checkArgument(parts.get(3).equals("persistent")|| parts.get(3).equals("non-persistent")); checkArgument(parts.get(8).length() > 0, "Empty subscription name"); return parts.get(8); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java new file mode 100644 index 0000000000000..22d5d049999b5 --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ReaderHandler.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Base64; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.LongAdder; + +import javax.servlet.http.HttpServletRequest; + +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +import org.apache.pulsar.client.api.ReaderConfiguration; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.ReaderImpl; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.common.naming.DestinationName; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.apache.pulsar.websocket.data.ConsumerMessage; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WriteCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.google.common.base.Splitter; + +/** + * + * WebSocket end-point url handler to handle incoming receive. + *

+ * receive: socket-proxy keeps pushing messages to client by writing into session.
+ *

+ * + */ +public class ReaderHandler extends AbstractWebSocketHandler { + private String subscription; + private final ReaderConfiguration conf; + private Reader reader; + + private final int maxPendingMessages; + private final AtomicInteger pendingMessages = new AtomicInteger(); + + private final LongAdder numMsgsDelivered; + private final LongAdder numBytesDelivered; + private volatile long msgDeliveredCounter = 0; + private static final AtomicLongFieldUpdater MSG_DELIVERED_COUNTER_UPDATER = + AtomicLongFieldUpdater.newUpdater(ReaderHandler.class, "msgDeliveredCounter"); + + public ReaderHandler(WebSocketService service, HttpServletRequest request) { + super(service, request); + this.subscription = ""; + this.conf = getReaderConfiguration(); + this.maxPendingMessages = (conf.getReceiverQueueSize() == 0) ? 1 : conf.getReceiverQueueSize(); + this.numMsgsDelivered = new LongAdder(); + this.numBytesDelivered = new LongAdder(); + } + + @Override + protected void createClient(Session session) { + + try { + this.reader = service.getPulsarClient().createReader(topic, getMessageId(), conf); + this.subscription = ((ReaderImpl)this.reader).getConsumer().getSubscription(); + this.service.addReader(this); + receiveMessage(); + } catch (Exception e) { + log.warn("[{}] Failed in creating subscription {} on topic {}", session.getRemoteAddress(), subscription, + topic, e); + close(WebSocketError.FailedToSubscribe, e.getMessage()); + } + } + + private void receiveMessage() { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Receive next message", getSession().getRemoteAddress(), topic, subscription); + } + + reader.readNextAsync().thenAccept(msg -> { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Got message {}", getSession().getRemoteAddress(), topic, subscription, + msg.getMessageId()); + } + + ConsumerMessage dm = new ConsumerMessage(); + dm.messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray()); + dm.payload = Base64.getEncoder().encodeToString(msg.getData()); + dm.properties = msg.getProperties(); + dm.publishTime = DATE_FORMAT.format(Instant.ofEpochMilli(msg.getPublishTime())); + if (msg.hasKey()) { + dm.key = msg.getKey(); + } + final long msgSize = msg.getData().length; + + try { + getSession().getRemote() + .sendString(ObjectMapperFactory.getThreadLocal().writeValueAsString(dm), new WriteCallback() { + @Override + public void writeFailed(Throwable th) { + log.warn("[{}/{}] Failed to deliver msg to {} {}", reader.getTopic(), subscription, + getRemote().getInetSocketAddress().toString(), th.getMessage()); + pendingMessages.decrementAndGet(); + // schedule receive as one of the delivery failed + service.getExecutor().execute(() -> receiveMessage()); + } + + @Override + public void writeSuccess() { + if (log.isDebugEnabled()) { + log.debug("[{}/{}] message is delivered successfully to {} ", reader.getTopic(), + subscription, getRemote().getInetSocketAddress().toString()); + } + updateDeliverMsgStat(msgSize); + } + }); + } catch (JsonProcessingException e) { + close(WebSocketError.FailedToSerializeToJSON); + } + + int pending = pendingMessages.incrementAndGet(); + if (pending < maxPendingMessages) { + // Start next read in a separate thread to avoid recursion + service.getExecutor().execute(() -> receiveMessage()); + } + }).exceptionally(exception -> { + log.warn("[{}/{}] Failed to deliver msg to {} {}", reader.getTopic(), + subscription, getRemote().getInetSocketAddress().toString(), exception); + return null; + }); + } + + @Override + public void onWebSocketText(String message) { + super.onWebSocketText(message); + + // We should have received an ack + // but reader doesn't send an ack to broker here because already reader did + + int pending = pendingMessages.getAndDecrement(); + if (pending >= maxPendingMessages) { + // Resume delivery + receiveMessage(); + } + } + + @Override + public void close() throws IOException { + if (reader != null) { + this.service.removeReader(this); + reader.closeAsync().thenAccept(x -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Closed reader asynchronously", reader.getTopic()); + } + }).exceptionally(exception -> { + log.warn("[{}] Failed to close reader", reader.getTopic(), exception); + return null; + }); + } + } + + public Consumer getConsumer() { + return ((ReaderImpl)reader).getConsumer(); + } + + public String getSubscription() { + return subscription; + } + + public SubscriptionType getSubscriptionType() { + return SubscriptionType.Exclusive; + } + + public long getAndResetNumMsgsDelivered() { + return numMsgsDelivered.sumThenReset(); + } + + public long getAndResetNumBytesDelivered() { + return numBytesDelivered.sumThenReset(); + } + + public long getMsgDeliveredCounter() { + return MSG_DELIVERED_COUNTER_UPDATER.get(this); + } + + protected void updateDeliverMsgStat(long msgSize) { + numMsgsDelivered.increment(); + MSG_DELIVERED_COUNTER_UPDATER.incrementAndGet(this); + numBytesDelivered.add(msgSize); + } + + private ReaderConfiguration getReaderConfiguration() { + ReaderConfiguration conf = new ReaderConfiguration(); + + if (queryParams.containsKey("readerName")) { + conf.setReaderName(queryParams.get("readerName")); + } + + if (queryParams.containsKey("receiverQueueSize")) { + conf.setReceiverQueueSize(Math.min(Integer.parseInt(queryParams.get("receiverQueueSize")), 1000)); + } + return conf; + } + + @Override + protected Boolean isAuthorized(String authRole) throws Exception { + return service.getAuthorizationManager().canConsume(DestinationName.get(topic), authRole); + } + + private MessageId getMessageId() throws IOException { + MessageId messageId = MessageId.latest; + if (isNotBlank(queryParams.get("messageId"))) { + if (queryParams.get("messageId").equals("earliest")) { + messageId = MessageId.earliest; + } else if (!queryParams.get("messageId").equals("latest")) { + messageId = MessageIdImpl.fromByteArray(Base64.getDecoder().decode(queryParams.get("messageId"))); + } + } + return messageId; + } + + private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withZone(ZoneId.systemDefault()); + + private static final Logger log = LoggerFactory.getLogger(ReaderHandler.class); + +} diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java new file mode 100644 index 0000000000000..9f51681c61f09 --- /dev/null +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketReaderServlet.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.websocket; + +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; + +public class WebSocketReaderServlet extends WebSocketServlet { + private static final long serialVersionUID = 1L; + + public static final String SERVLET_PATH = "/ws/reader"; + + WebSocketService service; + + public WebSocketReaderServlet(WebSocketService service) { + super(); + this.service = service; + } + + @Override + public void configure(WebSocketServletFactory factory) { + factory.getPolicy().setMaxTextMessageSize(WebSocketService.MaxTextFrameSize); + + factory.setCreator((request, response) -> new ReaderHandler(service, request.getHttpServletRequest())); + } +} \ No newline at end of file diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 587211f20ed78..298d85bde7e3b 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -80,6 +80,7 @@ public class WebSocketService implements Closeable { private ClusterData localCluster; private final ConcurrentOpenHashMap> topicProducerMap; private final ConcurrentOpenHashMap> topicConsumerMap; + private final ConcurrentOpenHashMap> topicReaderMap; private final ProxyStats proxyStats; public WebSocketService(WebSocketProxyConfiguration config) { @@ -91,6 +92,7 @@ public WebSocketService(ClusterData localCluster, ServiceConfiguration config) { this.localCluster = localCluster; this.topicProducerMap = new ConcurrentOpenHashMap<>(); this.topicConsumerMap = new ConcurrentOpenHashMap<>(); + this.topicReaderMap = new ConcurrentOpenHashMap<>(); this.proxyStats = new ProxyStats(this); } @@ -305,6 +307,23 @@ public boolean removeConsumer(ConsumerHandler consumer) { } return false; } + + public boolean addReader(ReaderHandler reader) { + return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> Lists.newArrayList()) + .add(reader); + } + + public ConcurrentOpenHashMap> getReaders() { + return topicReaderMap; + } + + public boolean removeReader(ReaderHandler reader) { + final String topicName = reader.getConsumer().getTopic(); + if (topicReaderMap.containsKey(topicName)) { + return topicReaderMap.get(topicName).remove(reader); + } + return false; + } public ServiceConfiguration getConfig() { return config; diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStats.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStats.java index 8066469fc492a..4307e728615cd 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStats.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/admin/WebSocketProxyStats.java @@ -94,20 +94,32 @@ public Map getProxyStats() { public ProxyTopicStat getStat(String topicName) { - if (!service().getProducers().containsKey(topicName) && !service().getConsumers().containsKey(topicName)) { + if (!service().getProducers().containsKey(topicName) + && !service().getConsumers().containsKey(topicName) + && !service().getReaders().containsKey(topicName)) { LOG.warn("topic doesn't exist {}", topicName); throw new RestException(Status.NOT_FOUND, "Topic does not exist"); } ProxyTopicStat topicStat = new ProxyTopicStat(); - service().getProducers().get(topicName).forEach(handler -> { - ProducerStats stat = new ProducerStats(handler); - topicStat.producerStats.add(stat); + if (service().getProducers().containsKey(topicName)){ + service().getProducers().get(topicName).forEach(handler -> { + ProducerStats stat = new ProducerStats(handler); + topicStat.producerStats.add(stat); - }); + }); + } - service().getConsumers().get(topicName).forEach(handler -> { - topicStat.consumerStats.add(new ConsumerStats(handler)); - }); + if (service().getConsumers().containsKey(topicName)){ + service().getConsumers().get(topicName).forEach(handler -> { + topicStat.consumerStats.add(new ConsumerStats(handler)); + }); + } + + if (service().getReaders().containsKey(topicName)){ + service().getReaders().get(topicName).forEach(handler -> { + topicStat.consumerStats.add(new ConsumerStats(handler)); + }); + } return topicStat; } @@ -125,6 +137,11 @@ public Map getStat() { handlers.forEach(handler -> topicStat.consumerStats.add(new ConsumerStats(handler))); statMap.put(topicName, topicStat); }); + service().getReaders().forEach((topicName, handlers) -> { + ProxyTopicStat topicStat = statMap.computeIfAbsent(topicName, t -> new ProxyTopicStat()); + handlers.forEach(handler -> topicStat.consumerStats.add(new ConsumerStats(handler))); + statMap.put(topicName, topicStat); + }); return statMap; } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java index 8cf0534f23cfa..690ad6a170592 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/service/WebSocketServiceStarter.java @@ -24,6 +24,7 @@ import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.websocket.WebSocketConsumerServlet; import org.apache.pulsar.websocket.WebSocketProducerServlet; +import org.apache.pulsar.websocket.WebSocketReaderServlet; import org.apache.pulsar.websocket.WebSocketService; import org.apache.pulsar.websocket.admin.WebSocketProxyStats; import org.slf4j.Logger; @@ -51,6 +52,7 @@ public static void main(String args[]) throws Exception { public static void start(ProxyServer proxyServer, WebSocketService service) throws Exception { proxyServer.addWebSocketServlet(WebSocketProducerServlet.SERVLET_PATH, new WebSocketProducerServlet(service)); proxyServer.addWebSocketServlet(WebSocketConsumerServlet.SERVLET_PATH, new WebSocketConsumerServlet(service)); + proxyServer.addWebSocketServlet(WebSocketReaderServlet.SERVLET_PATH, new WebSocketReaderServlet(service)); proxyServer.addRestResources(ADMIN_PATH, WebSocketProxyStats.class.getPackage().getName(), service); proxyServer.start(); service.start(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyTopicStat.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyTopicStat.java index 21f31b68946e2..ce6bd91643c65 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyTopicStat.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/stats/ProxyTopicStat.java @@ -23,6 +23,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.websocket.ConsumerHandler; import org.apache.pulsar.websocket.ProducerHandler; +import org.apache.pulsar.websocket.ReaderHandler; import com.google.common.collect.Sets; @@ -64,6 +65,13 @@ public ConsumerStats(ConsumerHandler handler) { this.remoteConnection = handler.getRemote().getInetSocketAddress().toString(); this.numberOfMsgDelivered = handler.getMsgDeliveredCounter(); } + + public ConsumerStats(ReaderHandler handler) { + this.subscriptionName = handler.getSubscription(); + this.subscriptionType = handler.getSubscriptionType(); + this.remoteConnection = handler.getRemote().getInetSocketAddress().toString(); + this.numberOfMsgDelivered = handler.getMsgDeliveredCounter(); + } public String remoteConnection; public String subscriptionName;