Skip to content

Commit

Permalink
Websocket dead letter topic (PIP22) (apache#2968)
Browse files Browse the repository at this point in the history
* Add support for dead letter policy in the websocket proxy

* Add doc for websocket dead letter topic params
  • Loading branch information
cbornet authored and merlimat committed Nov 9, 2018
1 parent 037c153 commit 34ab423
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.PulsarClientException.ConsumerBusyException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ConsumerAck;
Expand Down Expand Up @@ -92,13 +92,13 @@ public ConsumerHandler(WebSocketService service, HttpServletRequest request, Ser
this.numMsgsAcked = new LongAdder();

try {
// checkAuth() and getConsumerConfiguration() should be called after assigning a value to this.subscription
this.subscription = extractSubscription(request);
builder = (ConsumerBuilderImpl<byte[]>) getConsumerConfiguration(service.getPulsarClient());
this.maxPendingMessages = (builder.getConf().getReceiverQueueSize() == 0) ? 1
: builder.getConf().getReceiverQueueSize();
this.subscriptionType = builder.getConf().getSubscriptionType();

// checkAuth() should be called after assigning a value to this.subscription
this.subscription = extractSubscription(request);
if (!checkAuth(response)) {
return;
}
Expand Down Expand Up @@ -191,7 +191,7 @@ public void writeSuccess() {
int pending = pendingMessages.incrementAndGet();
if (pending < maxPendingMessages) {
// Start next read in a separate thread to avoid recursion
service.getExecutor().execute(() -> receiveMessage());
service.getExecutor().execute(this::receiveMessage);
}
}).exceptionally(exception -> {
if (exception.getCause() instanceof AlreadyClosedException) {
Expand Down Expand Up @@ -313,6 +313,19 @@ private ConsumerBuilder<byte[]> getConsumerConfiguration(PulsarClient client) {
builder.priorityLevel(Integer.parseInt(queryParams.get("priorityLevel")));
}

if (queryParams.containsKey("maxRedeliverCount") || queryParams.containsKey("deadLetterTopic")) {
DeadLetterPolicy.DeadLetterPolicyBuilder dlpBuilder = DeadLetterPolicy.builder();
if (queryParams.containsKey("maxRedeliverCount")) {
dlpBuilder.maxRedeliverCount(Integer.parseInt(queryParams.get("maxRedeliverCount")))
.deadLetterTopic(String.format("%s-%s-DLQ", topic, subscription));
}

if (queryParams.containsKey("deadLetterTopic")) {
dlpBuilder.deadLetterTopic(queryParams.get("deadLetterTopic"));
}
builder.deadLetterPolicy(dlpBuilder.build());
}

return builder;
}

Expand Down
2 changes: 2 additions & 0 deletions site2/docs/client-libraries-websocket.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ Key | Type | Required? | Explanation
`receiverQueueSize` | int | no | Size of the consumer receive queue (default: 1000)
`consumerName` | string | no | Consumer name
`priorityLevel` | int | no | Define a [priority](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerConfiguration.html#setPriorityLevel-int-) for the consumer
`maxRedeliverCount` | int | no | Define a [maxRedeliverCount](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: 0). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.
`deadLetterTopic` | string | no | Define a [deadLetterTopic](http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder.html#deadLetterPolicy-org.apache.pulsar.client.api.DeadLetterPolicy-) for the consumer (default: {topic}-{subscription}-DLQ). Activates [Dead Letter Topic](https://github.com/apache/pulsar/wiki/PIP-22%3A-Pulsar-Dead-Letter-Topic) feature.

##### Receiving messages

Expand Down

0 comments on commit 34ab423

Please sign in to comment.