Skip to content

Commit

Permalink
Add default ackTimeout(30s) for dead letter policy (apache#3014)
Browse files Browse the repository at this point in the history
Fix issue apache#2987 
### Motivation

In version 2.2.0, support DeadLetterTopic feature. This feature based on message redelivery. So ack timeout is necessary.

### Modifications

Set ackTimeout(30s) when enable the dead letter policy but not set the ackTimeout;
  • Loading branch information
codelipenghui authored and sijie committed Nov 22, 2018
1 parent ac97846 commit 0a7e133
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
* .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("your-topic-name").build())
* .subscribe();
* </pre>
* When a dead letter policy is specified, and no ackTimeoutMillis is specified,
* then the ack timeout will be set to 30000 millisecond
*/
ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
private List<ConsumerInterceptor<T>> interceptorList;

private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
private static long DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER = 30000L;


public ConsumerBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
this(client, new ConsumerConfigurationData<T>(), schema);
Expand Down Expand Up @@ -266,7 +268,12 @@ public ConsumerBuilder<T> intercept(ConsumerInterceptor<T>... interceptors) {

@Override
public ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
conf.setDeadLetterPolicy(deadLetterPolicy);
if (deadLetterPolicy != null) {
if (conf.getAckTimeoutMillis() == 0) {
conf.setAckTimeoutMillis(DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER);
}
conf.setDeadLetterPolicy(deadLetterPolicy);
}
return this;
}

Expand Down

0 comments on commit 0a7e133

Please sign in to comment.