Skip to content

Commit

Permalink
Optimize memory usage: support to shrink for pendingAcks map (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lordcheng10 authored Mar 5, 2022
1 parent 01b5567 commit e747b8f
Show file tree
Hide file tree
Showing 4 changed files with 1,116 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean isAllowAutoUpdateSchemaEnabled = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether to enable the automatic shrink of pendingAcks map, "
+ "the default is false, which means it is not enabled. "
+ "When there are a large number of share or key share consumers in the cluster, "
+ "it can be enabled to reduce the memory consumption caused by pendingAcks.")
private boolean autoShrinkForConsumerPendingAcksMap = false;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand All @@ -61,6 +59,8 @@
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -186,7 +186,12 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
stats.metadata = this.metadata;

if (Subscription.isIndividualAckMode(subType)) {
this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
this.pendingAcks = ConcurrentLongLongPairHashMap.newBuilder()
.autoShrink(subscription.getTopic().getBrokerService()
.getPulsar().getConfiguration().isAutoShrinkForConsumerPendingAcksMap())
.expectedItems(256)
.concurrencyLevel(1)
.build();
} else {
// We don't need to keep track of pending acks if the subscription is not shared
this.pendingAcks = null;
Expand Down
Loading

0 comments on commit e747b8f

Please sign in to comment.