Skip to content

Commit

Permalink
Allow to configure ack-timeout tick time (apache#4760)
Browse files Browse the repository at this point in the history
### Motivation

After the changes in apache#3118, there has a been a sharp increase of memory utilization for the UnackedMessageTracker due to the time buckets being created. 

This is especially true when the acktimeout is set to a larger value (eg: 1h) where 3600 time-buckets are being created. This lead to use 20MB per partition even when no message is tracked.

Allowing to configure the tick time so that application can tune it based on needs.

Additionally, fixed the logic that keeps creating hash maps and throwing them away at each tick time iteration, since that creates a lot of garbage and doesn't take care of the fact that the hash maps are expanding based on the required capacity (so next time they are already of the "right" size). 

On a final note: the current default of 1sec seems very wasteful. Something like 10s should be more appropriate as default.
  • Loading branch information
merlimat authored and sijie committed Jul 21, 2019
1 parent 5cff169 commit f13af48
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public interface ConsumerBuilder<T> extends Cloneable {

/**
* Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
* 10 seconds.
* 1 second.
* <p>
* By default, the acknowledge timeout is disabled and that means that messages delivered to a
* consumer will not be re-delivered unless the consumer crashes.
Expand All @@ -187,6 +187,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);

/**
* Define the granularity of the ack-timeout redelivery.
* <p>
* By default, the tick time is set to 1 second. Using an higher tick time will
* reduce the memory overhead to track messages when the ack-timeout is set to
* bigger values (eg: 1hour).
*
* @param tickTime
* the min precision for the ack timeout messages tracker
* @param timeUnit
* unit in which the timeout is provided.
* @return the consumer builder instance
*/
ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit);

/**
* Set the delay to wait before re-delivering messages that have failed to be process.
* <p>
Expand Down Expand Up @@ -386,7 +401,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* C5 1 1
* Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4
* </pre>
*
*
* <b>Failover subscription</b>
* Broker selects active consumer for a failover-subscription based on consumer's priority-level and lexicographical sorting of a consumer name.
* eg:
Expand All @@ -395,15 +410,15 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Consumer PriorityLevel Name
* C1 0 aaa
* C2 0 bbb
*
*
* 2. Active consumer = C2 : Consumer with highest priority
* Consumer PriorityLevel Name
* C1 1 aaa
* C2 0 bbb
*
*
* Partitioned-topics:
* Broker evenly assigns partitioned topics to highest priority consumers.
*
*
* </pre>
*
* @param priorityLevel the priority of this consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
private List<ConsumerInterceptor<T>> interceptorList;

private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
private static long MIN_TICK_TIME_MILLIS = 100;
private static long DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER = 30000L;


Expand Down Expand Up @@ -156,6 +157,14 @@ public ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit) {
return this;
}

@Override
public ConsumerBuilder<T> ackTimeoutTickTime(long tickTime, TimeUnit timeUnit) {
checkArgument(timeUnit.toMillis(tickTime) >= MIN_TICK_TIME_MILLIS,
"Ack timeout tick time should be greater than " + MIN_TICK_TIME_MILLIS + " ms");
conf.setTickDurationMillis(timeUnit.toMillis(tickTime));
return this;
}

@Override
public ConsumerBuilder<T> negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit) {
checkArgument(redeliveryDelay >= 0, "redeliveryDelay needs to be >= 0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@
import com.google.common.base.Preconditions;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.FastThreadLocal;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.ArrayDeque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
Expand All @@ -40,7 +43,7 @@ public class UnAckedMessageTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);

protected final ConcurrentHashMap<MessageId, ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
protected final LinkedList<ConcurrentOpenHashSet<MessageId>> timePartitions;
protected final ArrayDeque<ConcurrentOpenHashSet<MessageId>> timePartitions;

protected final Lock readLock;
protected final Lock writeLock;
Expand Down Expand Up @@ -94,6 +97,13 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBa
this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
}

private static final FastThreadLocal<HashSet<MessageId>> TL_MESSAGE_IDS_SET = new FastThreadLocal<HashSet<MessageId>>() {
@Override
protected HashSet<MessageId> initialValue() throws Exception {
return new HashSet<>();
}
};

public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis >= tickDurationInMs);
this.ackTimeoutMillis = ackTimeoutMillis;
Expand All @@ -102,20 +112,21 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBa
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
this.messageIdPartitionMap = new ConcurrentHashMap<>();
this.timePartitions = new LinkedList<>();
this.timePartitions = new ArrayDeque<>();

int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
timePartitions.add(new ConcurrentOpenHashSet<>());
timePartitions.add(new ConcurrentOpenHashSet<>(16, 1));
}

timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
Set<MessageId> messageIds = new HashSet<>();
Set<MessageId> messageIds = TL_MESSAGE_IDS_SET.get();
messageIds.clear();

writeLock.lock();
try {
timePartitions.addLast(new ConcurrentOpenHashSet<>());
ConcurrentOpenHashSet<MessageId> headPartition = timePartitions.removeFirst();
if (!headPartition.isEmpty()) {
log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size());
Expand All @@ -124,6 +135,9 @@ public void run(Timeout t) throws Exception {
messageIdPartitionMap.remove(messageId);
});
}

headPartition.clear();
timePartitions.addLast(headPartition);
} finally {
writeLock.unlock();
}
Expand All @@ -140,11 +154,7 @@ public void clear() {
writeLock.lock();
try {
messageIdPartitionMap.clear();
timePartitions.clear();
int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / tickDurationInMs);
for (int i = 0; i < blankPartitions + 1; i++) {
timePartitions.add(new ConcurrentOpenHashSet<>());
}
timePartitions.forEach(tp -> tp.clear());
} finally {
writeLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConf
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
.setUseV2WireProtocol(true)
.setStickyReadsEnabled(true)
.setStickyReadsEnabled(false)
.setReadEntryTimeout(60);
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ ManagedLedgerFactory getManagedLedgerFactory() throws Exception {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(this.pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
.setStickyReadsEnabled(true)
.setStickyReadsEnabled(false)
.setUseV2WireProtocol(true);
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
}
Expand Down

0 comments on commit f13af48

Please sign in to comment.