From f13af487699dcb36b7458ff4872a771837feefc6 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sun, 21 Jul 2019 01:51:05 -0700 Subject: [PATCH] Allow to configure ack-timeout tick time (#4760) ### Motivation After the changes in #3118, there has a been a sharp increase of memory utilization for the UnackedMessageTracker due to the time buckets being created. This is especially true when the acktimeout is set to a larger value (eg: 1h) where 3600 time-buckets are being created. This lead to use 20MB per partition even when no message is tracked. Allowing to configure the tick time so that application can tune it based on needs. Additionally, fixed the logic that keeps creating hash maps and throwing them away at each tick time iteration, since that creates a lot of garbage and doesn't take care of the fact that the hash maps are expanding based on the required capacity (so next time they are already of the "right" size). On a final note: the current default of 1sec seems very wasteful. Something like 10s should be more appropriate as default. --- .../pulsar/client/api/ConsumerBuilder.java | 25 ++++++++++++---- .../client/impl/ConsumerBuilderImpl.java | 9 ++++++ .../client/impl/UnAckedMessageTracker.java | 30 ++++++++++++------- .../sql/presto/PulsarConnectorCache.java | 2 +- .../pulsar/sql/presto/PulsarSplitManager.java | 2 +- 5 files changed, 51 insertions(+), 17 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index f8c5e0a5ff800..c4bf5a91833ad 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -170,7 +170,7 @@ public interface ConsumerBuilder extends Cloneable { /** * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than - * 10 seconds. + * 1 second. *

* By default, the acknowledge timeout is disabled and that means that messages delivered to a * consumer will not be re-delivered unless the consumer crashes. @@ -187,6 +187,21 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit); + /** + * Define the granularity of the ack-timeout redelivery. + *

+ * By default, the tick time is set to 1 second. Using an higher tick time will + * reduce the memory overhead to track messages when the ack-timeout is set to + * bigger values (eg: 1hour). + * + * @param tickTime + * the min precision for the ack timeout messages tracker + * @param timeUnit + * unit in which the timeout is provided. + * @return the consumer builder instance + */ + ConsumerBuilder ackTimeoutTickTime(long tickTime, TimeUnit timeUnit); + /** * Set the delay to wait before re-delivering messages that have failed to be process. *

@@ -386,7 +401,7 @@ public interface ConsumerBuilder extends Cloneable { * C5 1 1 * Order in which broker dispatches messages to consumers: C1, C2, C3, C1, C4, C5, C4 * - * + * * Failover subscription * Broker selects active consumer for a failover-subscription based on consumer's priority-level and lexicographical sorting of a consumer name. * eg: @@ -395,15 +410,15 @@ public interface ConsumerBuilder extends Cloneable { * Consumer PriorityLevel Name * C1 0 aaa * C2 0 bbb - * + * * 2. Active consumer = C2 : Consumer with highest priority * Consumer PriorityLevel Name * C1 1 aaa * C2 0 bbb - * + * * Partitioned-topics: * Broker evenly assigns partitioned topics to highest priority consumers. - * + * * * * @param priorityLevel the priority of this consumer diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 691344253d27d..9eed128ac9c25 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -58,6 +58,7 @@ public class ConsumerBuilderImpl implements ConsumerBuilder { private List> interceptorList; private static long MIN_ACK_TIMEOUT_MILLIS = 1000; + private static long MIN_TICK_TIME_MILLIS = 100; private static long DEFAULT_ACK_TIMEOUT_MILLIS_FOR_DEAD_LETTER = 30000L; @@ -156,6 +157,14 @@ public ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit) { return this; } + @Override + public ConsumerBuilder ackTimeoutTickTime(long tickTime, TimeUnit timeUnit) { + checkArgument(timeUnit.toMillis(tickTime) >= MIN_TICK_TIME_MILLIS, + "Ack timeout tick time should be greater than " + MIN_TICK_TIME_MILLIS + " ms"); + conf.setTickDurationMillis(timeUnit.toMillis(tickTime)); + return this; + } + @Override public ConsumerBuilder negativeAckRedeliveryDelay(long redeliveryDelay, TimeUnit timeUnit) { checkArgument(redeliveryDelay >= 0, "redeliveryDelay needs to be >= 0"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java index 0ab17d98b7d6d..aa576b5d058b3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java @@ -21,12 +21,15 @@ import com.google.common.base.Preconditions; import io.netty.util.Timeout; import io.netty.util.TimerTask; +import io.netty.util.concurrent.FastThreadLocal; + import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; +import java.util.ArrayDeque; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -40,7 +43,7 @@ public class UnAckedMessageTracker implements Closeable { private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class); protected final ConcurrentHashMap> messageIdPartitionMap; - protected final LinkedList> timePartitions; + protected final ArrayDeque> timePartitions; protected final Lock readLock; protected final Lock writeLock; @@ -94,6 +97,13 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBa this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis); } + private static final FastThreadLocal> TL_MESSAGE_IDS_SET = new FastThreadLocal>() { + @Override + protected HashSet initialValue() throws Exception { + return new HashSet<>(); + } + }; + public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis, long tickDurationInMs) { Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis >= tickDurationInMs); this.ackTimeoutMillis = ackTimeoutMillis; @@ -102,20 +112,21 @@ public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBa this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); this.messageIdPartitionMap = new ConcurrentHashMap<>(); - this.timePartitions = new LinkedList<>(); + this.timePartitions = new ArrayDeque<>(); int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs); for (int i = 0; i < blankPartitions + 1; i++) { - timePartitions.add(new ConcurrentOpenHashSet<>()); + timePartitions.add(new ConcurrentOpenHashSet<>(16, 1)); } timeout = client.timer().newTimeout(new TimerTask() { @Override public void run(Timeout t) throws Exception { - Set messageIds = new HashSet<>(); + Set messageIds = TL_MESSAGE_IDS_SET.get(); + messageIds.clear(); + writeLock.lock(); try { - timePartitions.addLast(new ConcurrentOpenHashSet<>()); ConcurrentOpenHashSet headPartition = timePartitions.removeFirst(); if (!headPartition.isEmpty()) { log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size()); @@ -124,6 +135,9 @@ public void run(Timeout t) throws Exception { messageIdPartitionMap.remove(messageId); }); } + + headPartition.clear(); + timePartitions.addLast(headPartition); } finally { writeLock.unlock(); } @@ -140,11 +154,7 @@ public void clear() { writeLock.lock(); try { messageIdPartitionMap.clear(); - timePartitions.clear(); - int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / tickDurationInMs); - for (int i = 0; i < blankPartitions + 1; i++) { - timePartitions.add(new ConcurrentOpenHashSet<>()); - } + timePartitions.forEach(tp -> tp.clear()); } finally { writeLock.unlock(); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index a9cd070d3f13e..36ba1d2b0df13 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -86,7 +86,7 @@ private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConf .setZkServers(pulsarConnectorConfig.getZookeeperUri()) .setClientTcpNoDelay(false) .setUseV2WireProtocol(true) - .setStickyReadsEnabled(true) + .setStickyReadsEnabled(false) .setReadEntryTimeout(60); return new ManagedLedgerFactoryImpl(bkClientConfiguration); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index 40f31c95e831c..445564a0077d2 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -128,7 +128,7 @@ ManagedLedgerFactory getManagedLedgerFactory() throws Exception { ClientConfiguration bkClientConfiguration = new ClientConfiguration() .setZkServers(this.pulsarConnectorConfig.getZookeeperUri()) .setClientTcpNoDelay(false) - .setStickyReadsEnabled(true) + .setStickyReadsEnabled(false) .setUseV2WireProtocol(true); return new ManagedLedgerFactoryImpl(bkClientConfiguration); }