From 66a68d58172ce06930ec84a9b0e41944a2c8e5b6 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Mon, 3 Dec 2018 23:11:38 +0100 Subject: [PATCH] Add pull-mode on Websockets (#3058) * Add pull-mode on Websockets Fix #3052 * Add WebSocket pull-mode test * Add doc on WebSocket pull mode --- .../proxy/ProxyPublishConsumeTest.java | 60 +++++++++++++++++++ .../websocket/proxy/SimpleConsumerSocket.java | 7 +++ .../pulsar/websocket/ConsumerHandler.java | 55 +++++++++++------ ...{ConsumerAck.java => ConsumerCommand.java} | 11 +--- site2/docs/client-libraries-websocket.md | 32 ++++++++++ 5 files changed, 138 insertions(+), 27 deletions(-) rename pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/{ConsumerAck.java => ConsumerCommand.java} (85%) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 7f1b5aa45cc49..9ddf9554842d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -465,6 +465,66 @@ public void consumeMessagesInPartitionedTopicTest() throws Exception { } } + @Test(timeOut = 10000) + public void socketPullModeTest() throws Exception { + final String topic = "my-property/my-ns/my-topic8"; + final String subscription = "my-sub"; + final String consumerUri = String.format( + "ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared", + port, topic, subscription + ); + final String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", port, topic); + + URI consumeUri = URI.create(consumerUri); + URI produceUri = URI.create(producerUri); + + WebSocketClient consumeClient1 = new WebSocketClient(); + SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket(); + WebSocketClient consumeClient2 = new WebSocketClient(); + SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket(); + WebSocketClient produceClient = new WebSocketClient(); + SimpleProducerSocket produceSocket = new SimpleProducerSocket(); + + try { + consumeClient1.start(); + consumeClient2.start(); + ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest(); + ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest(); + Future consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1); + Future consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2); + log.info("Connecting to : {}", consumeUri); + + // let it connect + Assert.assertTrue(consumerFuture1.get().isOpen()); + Assert.assertTrue(consumerFuture2.get().isOpen()); + + ClientUpgradeRequest produceRequest = new ClientUpgradeRequest(); + produceClient.start(); + Future producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest); + Assert.assertTrue(producerFuture.get().isOpen()); + produceSocket.sendMessage(100); + + Thread.sleep(500); + + // Verify no messages received despite production + Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 0); + Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 0); + + consumeSocket1.sendPermits(3); + consumeSocket2.sendPermits(2); + consumeSocket2.sendPermits(2); + consumeSocket2.sendPermits(2); + + Thread.sleep(500); + + Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 3); + Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 6); + + } finally { + stopWebSocketClient(consumeClient1, consumeClient2, produceClient); + } + } + private void verifyTopicStat(Client client, String baseUrl, String topic) { String statUrl = baseUrl + topic + "/stats"; WebTarget webTarget = client.target(statUrl); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java index 09b29c47db067..c303ed50c5b0a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/SimpleConsumerSocket.java @@ -81,6 +81,13 @@ public synchronized void onMessage(String msg) throws JsonParseException, IOExce this.getRemote().sendString(ack.toString()); } + public void sendPermits(int nbPermits) throws IOException { + JsonObject permitMessage = new JsonObject(); + permitMessage.add("type", new JsonPrimitive("permit")); + permitMessage.add("permitMessages", new JsonPrimitive(nbPermits)); + this.getRemote().sendString(permitMessage.toString()); + } + public RemoteEndpoint getRemote() { return this.session.getRemote(); } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java index 4736cbe602d31..d8993d1773879 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/ConsumerHandler.java @@ -47,7 +47,7 @@ import org.apache.pulsar.client.impl.ConsumerBuilderImpl; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.ObjectMapperFactory; -import org.apache.pulsar.websocket.data.ConsumerAck; +import org.apache.pulsar.websocket.data.ConsumerCommand; import org.apache.pulsar.websocket.data.ConsumerMessage; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.WriteCallback; @@ -72,8 +72,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler { private SubscriptionType subscriptionType; private Consumer consumer; - private int maxPendingMessages; + private int maxPendingMessages = 0; private final AtomicInteger pendingMessages = new AtomicInteger(); + private final boolean pullMode; private final LongAdder numMsgsDelivered; private final LongAdder numBytesDelivered; @@ -90,13 +91,17 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser this.numMsgsDelivered = new LongAdder(); this.numBytesDelivered = new LongAdder(); this.numMsgsAcked = new LongAdder(); + this.pullMode = Boolean.valueOf(queryParams.get("pullMode")); try { // checkAuth() and getConsumerConfiguration() should be called after assigning a value to this.subscription this.subscription = extractSubscription(request); builder = (ConsumerBuilderImpl) getConsumerConfiguration(service.getPulsarClient()); - this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1 - : builder.getConf().getReceiverQueueSize(); + + if (!this.pullMode) { + this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1 + : builder.getConf().getReceiverQueueSize(); + } this.subscriptionType = builder.getConf().getSubscriptionType(); if (!checkAuth(response)) { @@ -209,31 +214,43 @@ public void writeSuccess() { @Override public void onWebSocketConnect(Session session) { super.onWebSocketConnect(session); - receiveMessage(); + if (!pullMode) { + receiveMessage(); + } } @Override public void onWebSocketText(String message) { super.onWebSocketText(message); - // We should have received an ack - - MessageId msgId; try { - ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class); - msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic); + ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class); + if ("permit".equals(command.type)) { + if (command.permitMessages == null) { + throw new IOException("Missing required permitMessages field for 'permit' command"); + } + if (this.pullMode) { + int pending = pendingMessages.getAndAdd(-command.permitMessages); + if (pending >= 0) { + // Resume delivery + receiveMessage(); + } + } + } else { + // We should have received an ack + MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), topic); + consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment()); + if (!this.pullMode) { + int pending = pendingMessages.getAndDecrement(); + if (pending >= maxPendingMessages) { + // Resume delivery + receiveMessage(); + } + } + } } catch (IOException e) { log.warn("Failed to deserialize message id: {}", message, e); close(WebSocketError.FailedToDeserializeFromJSON); - return; - } - - consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment()); - - int pending = pendingMessages.getAndDecrement(); - if (pending >= maxPendingMessages) { - // Resume delivery - receiveMessage(); } } diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java similarity index 85% rename from pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java rename to pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java index a7493fb001373..88112a8dc53e3 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerAck.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/data/ConsumerCommand.java @@ -18,13 +18,8 @@ */ package org.apache.pulsar.websocket.data; -public class ConsumerAck { +public class ConsumerCommand { + public String type; public String messageId; - - public ConsumerAck() { - } - - public ConsumerAck(String messageId) { - this.messageId = messageId; - } + public Integer permitMessages; } diff --git a/site2/docs/client-libraries-websocket.md b/site2/docs/client-libraries-websocket.md index 886604ff682c2..e2fbaf9d70c15 100644 --- a/site2/docs/client-libraries-websocket.md +++ b/site2/docs/client-libraries-websocket.md @@ -144,6 +144,11 @@ Key | Type | Required? | Explanation `priorityLevel` | int | no | Define a [priority](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-) for the consumer `maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature. `deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature. +`pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below. + +NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service. +So messages will be subject to the redelivery settings as soon as the get into the receive queue, +even if the client doesn't consume on the WebSocket. ##### Receiving messages @@ -181,6 +186,33 @@ Key | Type | Required? | Explanation :---|:-----|:----------|:----------- `messageId`| string | yes | Message ID of the processed message +#### Flow control + +##### Push Mode + +By default (`pullMode=false`), the consumer endpoint will use the `receiverQueueSize` parameter both to size its +internal receive queue and to limit the number of unacknowledged messages that are passed to the WebSocket client. +In this mode, if you don't send acknowledgements, the Pulsar WebSocket service will stop sending messages after reaching +`receiverQueueSize` unacked messages sent to the WebSocket client. + +##### Pull Mode + +If you set `pullMode` to `true`, the WebSocket client will need to send `permit` commands to permit the +Pulsar WebSocket service to send more messages. + +```json +{ + "type": "permit", + "permitMessages": 100 +} +``` + +Key | Type | Required? | Explanation +:---|:-----|:----------|:----------- +`type`| string | yes | Type of command. Must be `permit` +`permitMessages`| int | yes | Number of messages to permit + +NB: in this mode it's possible to acknowledge messages in a different connection. ### Reader endpoint