Skip to content

Commit

Permalink
Add pull-mode on Websockets (apache#3058)
Browse files Browse the repository at this point in the history
* Add pull-mode on Websockets

Fix apache#3052

* Add WebSocket pull-mode test

* Add doc on WebSocket pull mode
  • Loading branch information
cbornet authored and merlimat committed Dec 3, 2018
1 parent 5d06b1e commit 66a68d5
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,66 @@ public void consumeMessagesInPartitionedTopicTest() throws Exception {
}
}

@Test(timeOut = 10000)
public void socketPullModeTest() throws Exception {
final String topic = "my-property/my-ns/my-topic8";
final String subscription = "my-sub";
final String consumerUri = String.format(
"ws://localhost:%d/ws/v2/consumer/persistent/%s/%s?pullMode=true&subscriptionType=Shared",
port, topic, subscription
);
final String producerUri = String.format("ws://localhost:%d/ws/v2/producer/persistent/%s", port, topic);

URI consumeUri = URI.create(consumerUri);
URI produceUri = URI.create(producerUri);

WebSocketClient consumeClient1 = new WebSocketClient();
SimpleConsumerSocket consumeSocket1 = new SimpleConsumerSocket();
WebSocketClient consumeClient2 = new WebSocketClient();
SimpleConsumerSocket consumeSocket2 = new SimpleConsumerSocket();
WebSocketClient produceClient = new WebSocketClient();
SimpleProducerSocket produceSocket = new SimpleProducerSocket();

try {
consumeClient1.start();
consumeClient2.start();
ClientUpgradeRequest consumeRequest1 = new ClientUpgradeRequest();
ClientUpgradeRequest consumeRequest2 = new ClientUpgradeRequest();
Future<Session> consumerFuture1 = consumeClient1.connect(consumeSocket1, consumeUri, consumeRequest1);
Future<Session> consumerFuture2 = consumeClient2.connect(consumeSocket2, consumeUri, consumeRequest2);
log.info("Connecting to : {}", consumeUri);

// let it connect
Assert.assertTrue(consumerFuture1.get().isOpen());
Assert.assertTrue(consumerFuture2.get().isOpen());

ClientUpgradeRequest produceRequest = new ClientUpgradeRequest();
produceClient.start();
Future<Session> producerFuture = produceClient.connect(produceSocket, produceUri, produceRequest);
Assert.assertTrue(producerFuture.get().isOpen());
produceSocket.sendMessage(100);

Thread.sleep(500);

// Verify no messages received despite production
Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 0);
Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 0);

consumeSocket1.sendPermits(3);
consumeSocket2.sendPermits(2);
consumeSocket2.sendPermits(2);
consumeSocket2.sendPermits(2);

Thread.sleep(500);

Assert.assertEquals(consumeSocket1.getReceivedMessagesCount(), 3);
Assert.assertEquals(consumeSocket2.getReceivedMessagesCount(), 6);

} finally {
stopWebSocketClient(consumeClient1, consumeClient2, produceClient);
}
}

private void verifyTopicStat(Client client, String baseUrl, String topic) {
String statUrl = baseUrl + topic + "/stats";
WebTarget webTarget = client.target(statUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ public synchronized void onMessage(String msg) throws JsonParseException, IOExce
this.getRemote().sendString(ack.toString());
}

public void sendPermits(int nbPermits) throws IOException {
JsonObject permitMessage = new JsonObject();
permitMessage.add("type", new JsonPrimitive("permit"));
permitMessage.add("permitMessages", new JsonPrimitive(nbPermits));
this.getRemote().sendString(permitMessage.toString());
}

public RemoteEndpoint getRemote() {
return this.session.getRemote();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
import org.apache.pulsar.websocket.data.ConsumerCommand;
import org.apache.pulsar.websocket.data.ConsumerMessage;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WriteCallback;
Expand All @@ -72,8 +72,9 @@ public class ConsumerHandler extends AbstractWebSocketHandler {
private SubscriptionType subscriptionType;
private Consumer<byte[]> consumer;

private int maxPendingMessages;
private int maxPendingMessages = 0;
private final AtomicInteger pendingMessages = new AtomicInteger();
private final boolean pullMode;

private final LongAdder numMsgsDelivered;
private final LongAdder numBytesDelivered;
Expand All @@ -90,13 +91,17 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser
this.numMsgsDelivered = new LongAdder();
this.numBytesDelivered = new LongAdder();
this.numMsgsAcked = new LongAdder();
this.pullMode = Boolean.valueOf(queryParams.get("pullMode"));

try {
// checkAuth() and getConsumerConfiguration() should be called after assigning a value to this.subscription
this.subscription = extractSubscription(request);
builder = (ConsumerBuilderImpl<byte[]>) getConsumerConfiguration(service.getPulsarClient());
this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
: builder.getConf().getReceiverQueueSize();

if (!this.pullMode) {
this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
: builder.getConf().getReceiverQueueSize();
}
this.subscriptionType = builder.getConf().getSubscriptionType();

if (!checkAuth(response)) {
Expand Down Expand Up @@ -209,31 +214,43 @@ public void writeSuccess() {
@Override
public void onWebSocketConnect(Session session) {
super.onWebSocketConnect(session);
receiveMessage();
if (!pullMode) {
receiveMessage();
}
}

@Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);

// We should have received an ack

MessageId msgId;
try {
ConsumerAck ack = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerAck.class);
msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(ack.messageId), topic);
ConsumerCommand command = ObjectMapperFactory.getThreadLocal().readValue(message, ConsumerCommand.class);
if ("permit".equals(command.type)) {
if (command.permitMessages == null) {
throw new IOException("Missing required permitMessages field for 'permit' command");
}
if (this.pullMode) {
int pending = pendingMessages.getAndAdd(-command.permitMessages);
if (pending >= 0) {
// Resume delivery
receiveMessage();
}
}
} else {
// We should have received an ack
MessageId msgId = MessageId.fromByteArrayWithTopic(Base64.getDecoder().decode(command.messageId), topic);
consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());
if (!this.pullMode) {
int pending = pendingMessages.getAndDecrement();
if (pending >= maxPendingMessages) {
// Resume delivery
receiveMessage();
}
}
}
} catch (IOException e) {
log.warn("Failed to deserialize message id: {}", message, e);
close(WebSocketError.FailedToDeserializeFromJSON);
return;
}

consumer.acknowledgeAsync(msgId).thenAccept(consumer -> numMsgsAcked.increment());

int pending = pendingMessages.getAndDecrement();
if (pending >= maxPendingMessages) {
// Resume delivery
receiveMessage();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,8 @@
*/
package org.apache.pulsar.websocket.data;

public class ConsumerAck {
public class ConsumerCommand {
public String type;
public String messageId;

public ConsumerAck() {
}

public ConsumerAck(String messageId) {
this.messageId = messageId;
}
public Integer permitMessages;
}
32 changes: 32 additions & 0 deletions site2/docs/client-libraries-websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ Key | Type | Required? | Explanation
`priorityLevel` | int | no | Define a [priority](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-) for the consumer
`maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`pullMode` | boolean | no | Enable pull mode (default: false). See "Flow Control" below.

NB: these parameter (except `pullMode`) apply to the internal consumer of the WebSocket service.
So messages will be subject to the redelivery settings as soon as the get into the receive queue,
even if the client doesn't consume on the WebSocket.

##### Receiving messages

Expand Down Expand Up @@ -181,6 +186,33 @@ Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`messageId`| string | yes | Message ID of the processed message

#### Flow control

##### Push Mode

By default (`pullMode=false`), the consumer endpoint will use the `receiverQueueSize` parameter both to size its
internal receive queue and to limit the number of unacknowledged messages that are passed to the WebSocket client.
In this mode, if you don't send acknowledgements, the Pulsar WebSocket service will stop sending messages after reaching
`receiverQueueSize` unacked messages sent to the WebSocket client.

##### Pull Mode

If you set `pullMode` to `true`, the WebSocket client will need to send `permit` commands to permit the
Pulsar WebSocket service to send more messages.

```json
{
"type": "permit",
"permitMessages": 100
}
```

Key | Type | Required? | Explanation
:---|:-----|:----------|:-----------
`type`| string | yes | Type of command. Must be `permit`
`permitMessages`| int | yes | Number of messages to permit

NB: in this mode it's possible to acknowledge messages in a different connection.

### Reader endpoint

Expand Down

0 comments on commit 66a68d5

Please sign in to comment.