diff --git a/conf/broker.conf b/conf/broker.conf index 85ffc11d34f42..04f38c958e1ad 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1172,6 +1172,14 @@ managedLedgerCursorRolloverTimeInSeconds=14400 # crashes. managedLedgerMaxUnackedRangesToPersist=10000 +# Maximum amount of memory used hold data read from storage (or from the cache). +# This mechanism prevents the broker to have too many concurrent +# reads from storage and fall into Out of Memory errors in case +# of multiple concurrent reads to multiple concurrent consumers. +# Set 0 in order to disable the feature. +# +managedLedgerMaxReadsInFlightSizeInMB=0 + # Max number of "acknowledgment holes" that can be stored in MetadataStore. If number of unack message range is higher # than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into # MetadataStore. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index 3a5a920bf1098..5aa4e8374d73a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -57,6 +57,11 @@ public class ManagedLedgerFactoryConfig { */ private boolean copyEntriesInCache = false; + /** + * Maximum number of (estimated) data in-flight reading from storage and the cache. + */ + private long managedLedgerMaxReadsInFlightSize = 0; + /** * Whether trace managed ledger task execution time. */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 574e2c17b6de7..6512399173f0a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -44,6 +44,8 @@ protected EntryImpl newObject(Handle handle) { private long entryId; ByteBuf data; + private Runnable onDeallocate; + public static EntryImpl create(LedgerEntry ledgerEntry) { EntryImpl entry = RECYCLER.get(); entry.timestamp = System.nanoTime(); @@ -102,6 +104,22 @@ private EntryImpl(Recycler.Handle recyclerHandle) { this.recyclerHandle = recyclerHandle; } + public void onDeallocate(Runnable r) { + if (this.onDeallocate == null) { + this.onDeallocate = r; + } else { + // this is not expected to happen + Runnable previous = this.onDeallocate; + this.onDeallocate = () -> { + try { + previous.run(); + } finally { + r.run(); + } + }; + } + } + public long getTimestamp() { return timestamp; } @@ -167,6 +185,13 @@ public ReferenceCounted touch(Object hint) { @Override protected void deallocate() { // This method is called whenever the ref-count of the EntryImpl reaches 0, so that now we can recycle it + if (onDeallocate != null) { + try { + onDeallocate.run(); + } finally { + onDeallocate = null; + } + } data.release(); data = null; timestamp = -1; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java new file mode 100644 index 0000000000000..b946dc09a0c71 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import com.google.common.annotations.VisibleForTesting; +import io.prometheus.client.Gauge; +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class InflightReadsLimiter { + + private static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge + .build() + .name("pulsar_ml_reads_inflight_bytes") + .help("Estimated number of bytes retained by data read from storage or cache") + .register(); + + private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge + .build() + .name("pulsar_ml_reads_available_inflight_bytes") + .help("Available space for inflight data read from storage or cache") + .register(); + + private final long maxReadsInFlightSize; + private long remainingBytes; + + public InflightReadsLimiter(long maxReadsInFlightSize) { + if (maxReadsInFlightSize <= 0) { + // set it to -1 in order to show in the metrics that the metric is not available + PULSAR_ML_READS_BUFFER_SIZE.set(-1); + PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(-1); + } + this.maxReadsInFlightSize = maxReadsInFlightSize; + this.remainingBytes = maxReadsInFlightSize; + } + + @VisibleForTesting + public synchronized long getRemainingBytes() { + return remainingBytes; + } + + @AllArgsConstructor + @ToString + static class Handle { + final long acquiredPermits; + final boolean success; + final int trials; + + final long creationTime; + } + + private static final Handle DISABLED = new Handle(0, true, 0, -1); + + Handle acquire(long permits, Handle current) { + if (maxReadsInFlightSize <= 0) { + // feature is disabled + return DISABLED; + } + synchronized (this) { + try { + if (current == null) { + if (remainingBytes == 0) { + return new Handle(0, false, 1, System.currentTimeMillis()); + } + if (remainingBytes >= permits) { + remainingBytes -= permits; + return new Handle(permits, true, 1, System.currentTimeMillis()); + } else { + long possible = remainingBytes; + remainingBytes = 0; + return new Handle(possible, false, 1, System.currentTimeMillis()); + } + } else { + if (current.trials >= 4 && current.acquiredPermits > 0) { + remainingBytes += current.acquiredPermits; + return new Handle(0, false, 1, current.creationTime); + } + if (remainingBytes == 0) { + return new Handle(current.acquiredPermits, false, current.trials + 1, + current.creationTime); + } + long needed = permits - current.acquiredPermits; + if (remainingBytes >= needed) { + remainingBytes -= needed; + return new Handle(permits, true, current.trials + 1, current.creationTime); + } else { + long possible = remainingBytes; + remainingBytes = 0; + return new Handle(current.acquiredPermits + possible, false, + current.trials + 1, current.creationTime); + } + } + } finally { + updateMetrics(); + } + } + } + + void release(Handle handle) { + if (handle == DISABLED) { + return; + } + synchronized (this) { + remainingBytes += handle.acquiredPermits; + updateMetrics(); + } + } + + private synchronized void updateMetrics() { + PULSAR_ML_READS_BUFFER_SIZE.set(maxReadsInFlightSize - remainingBytes); + PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(remainingBytes); + } + + public boolean isDisabled() { + return maxReadsInFlightSize <= 0; + } + + +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java index 09b8f02897eb1..8eefefa0f51f5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import lombok.AllArgsConstructor; import lombok.Value; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -37,6 +38,7 @@ /** * PendingReadsManager tries to prevent sending duplicate reads to BK. */ +@Slf4j public class PendingReadsManager { private static final Counter COUNT_ENTRIES_READ_FROM_BK = Counter @@ -315,8 +317,6 @@ synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback, void readEntries(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) { - - final PendingReadKey key = new PendingReadKey(firstEntry, lastEntry); Map pendingReadsForLedger = @@ -442,6 +442,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } } + void clear() { cachedPendingReads.clear(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index f1ead03da5db5..95b0fbcfd4e99 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -29,11 +29,15 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.bookkeeper.client.api.BKException; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -49,6 +53,11 @@ */ public class RangeEntryCacheImpl implements EntryCache { + /** + * Overhead per-entry to take into account the envelope. + */ + private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64; + private final RangeEntryCacheManagerImpl manager; final ManagedLedgerImpl ml; private ManagedLedgerInterceptor interceptor; @@ -56,14 +65,18 @@ public class RangeEntryCacheImpl implements EntryCache { private final boolean copyEntries; private final PendingReadsManager pendingReadsManager; + private volatile long estimatedEntrySize = 10 * 1024; + + private final long readEntryTimeoutMillis; private static final double MB = 1024 * 1024; public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) { this.manager = manager; - this.pendingReadsManager = new PendingReadsManager(this); this.ml = ml; + this.pendingReadsManager = new PendingReadsManager(this); this.interceptor = ml.getManagedLedgerInterceptor(); + this.readEntryTimeoutMillis = getManagedLedgerConfig().getReadEntryTimeoutSeconds(); this.entries = new RangeCache<>(EntryImpl::getLength, EntryImpl::getTimestamp); this.copyEntries = copyEntries; @@ -77,11 +90,21 @@ ManagedLedgerImpl getManagedLedger() { return ml; } + @VisibleForTesting + ManagedLedgerConfig getManagedLedgerConfig() { + return ml.getConfig(); + } + @Override public String getName() { return ml.getName(); } + @VisibleForTesting + InflightReadsLimiter getPendingReadsLimiter() { + return manager.getInflightReadsLimiter(); + } + public static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true, // preferDirect 0, // nHeapArenas, PooledByteBufAllocator.defaultNumDirectArena(), // nDirectArena @@ -271,6 +294,19 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole @SuppressWarnings({ "unchecked", "rawtypes" }) void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, final ReadEntriesCallback callback, Object ctx) { + asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, callback, ctx, null); + } + + void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, + final ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) { + + final AsyncCallbacks.ReadEntriesCallback callback = + handlePendingReadsLimits(lh, firstEntry, lastEntry, shouldCacheEntry, + originalCallback, ctx, handle); + if (callback == null) { + return; + } + final long ledgerId = lh.getId(); final int entriesToRead = (int) (lastEntry - firstEntry) + 1; final PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry); @@ -313,6 +349,74 @@ void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean sho } } + private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh, + long firstEntry, long lastEntry, + boolean shouldCacheEntry, + AsyncCallbacks.ReadEntriesCallback originalCallback, + Object ctx, InflightReadsLimiter.Handle handle) { + InflightReadsLimiter pendingReadsLimiter = getPendingReadsLimiter(); + if (pendingReadsLimiter.isDisabled()) { + return originalCallback; + } + long estimatedReadSize = (1 + lastEntry - firstEntry) + * (estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY); + final AsyncCallbacks.ReadEntriesCallback callback; + InflightReadsLimiter.Handle newHandle = pendingReadsLimiter.acquire(estimatedReadSize, handle); + if (!newHandle.success) { + long now = System.currentTimeMillis(); + if (now - newHandle.creationTime > readEntryTimeoutMillis) { + String message = "Time-out elapsed while acquiring enough permits " + + "on the memory limiter to read from ledger " + + lh.getId() + + ", " + getName() + + ", estimated read size " + estimatedReadSize + " bytes" + + " for " + (1 + lastEntry - firstEntry) + + " entries (check managedLedgerMaxReadsInFlightSizeInMB)"; + log.error(message); + pendingReadsLimiter.release(newHandle); + originalCallback.readEntriesFailed( + new ManagedLedgerException.TooManyRequestsException(message), ctx); + return null; + } + ml.getExecutor().submitOrdered(lh.getId(), () -> { + asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, + originalCallback, ctx, newHandle); + return null; + }); + return null; + } else { + callback = new AsyncCallbacks.ReadEntriesCallback() { + + @Override + public void readEntriesComplete(List entries, Object ctx) { + if (!entries.isEmpty()) { + long size = entries.get(0).getLength(); + estimatedEntrySize = size; + + AtomicInteger remainingCount = new AtomicInteger(entries.size()); + for (Entry entry : entries) { + ((EntryImpl) entry).onDeallocate(() -> { + if (remainingCount.decrementAndGet() <= 0) { + pendingReadsLimiter.release(newHandle); + } + }); + } + } else { + pendingReadsLimiter.release(newHandle); + } + originalCallback.readEntriesComplete(entries, ctx); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + pendingReadsLimiter.release(newHandle); + originalCallback.readEntriesFailed(exception, ctx); + } + }; + } + return callback; + } + /** * Reads the entries from Storage. * @param lh the handle diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java index 9087fc6d30293..080c70b5873cd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java @@ -51,14 +51,16 @@ public class RangeEntryCacheManagerImpl implements EntryCacheManager { private final ManagedLedgerFactoryImpl mlFactory; protected final ManagedLedgerFactoryMBeanImpl mlFactoryMBean; + private final InflightReadsLimiter inflightReadsLimiter; protected static final double MB = 1024 * 1024; - private static final double evictionTriggerThresholdPercent = 0.98; public RangeEntryCacheManagerImpl(ManagedLedgerFactoryImpl factory) { this.maxSize = factory.getConfig().getMaxCacheSize(); + this.inflightReadsLimiter = new InflightReadsLimiter( + factory.getConfig().getManagedLedgerMaxReadsInFlightSize()); this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent); this.cacheEvictionWatermark = factory.getConfig().getCacheEvictionWatermark(); this.evictionPolicy = new EntryCacheDefaultEvictionPolicy(); @@ -195,5 +197,9 @@ public static EntryImpl create(LedgerEntry ledgerEntry, ManagedLedgerInterceptor return returnEntry; } + public InflightReadsLimiter getInflightReadsLimiter() { + return inflightReadsLimiter; + } + private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheManagerImpl.class); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 78c7f7442bc9f..ec865018b34b0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; @@ -62,10 +63,12 @@ protected void setUpTestCase() throws Exception { when(ml1.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml1)); when(ml1.getExecutor()).thenReturn(super.executor); when(ml1.getFactory()).thenReturn(factory); + when(ml1.getConfig()).thenReturn(new ManagedLedgerConfig()); ml2 = mock(ManagedLedgerImpl.class); when(ml2.getScheduledExecutor()).thenReturn(executor); when(ml2.getName()).thenReturn("cache2"); + when(ml2.getConfig()).thenReturn(new ManagedLedgerConfig()); } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index c538cae0ee703..c8338798f271b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -41,6 +41,7 @@ import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.impl.cache.EntryCache; import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager; @@ -58,6 +59,7 @@ protected void setUpTestCase() throws Exception { when(ml.getName()).thenReturn("name"); when(ml.getExecutor()).thenReturn(executor); when(ml.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml)); + when(ml.getConfig()).thenReturn(new ManagedLedgerConfig()); } @Test(timeOut = 5000) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java new file mode 100644 index 0000000000000..2b69581ca2c73 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiterTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl.cache; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import lombok.extern.slf4j.Slf4j; +import org.testng.annotations.Test; + +@Slf4j +public class InflightReadsLimiterTest { + + @Test + public void testDisabled() throws Exception { + + InflightReadsLimiter limiter = new InflightReadsLimiter(0); + assertTrue(limiter.isDisabled()); + + limiter = new InflightReadsLimiter(-1); + assertTrue(limiter.isDisabled()); + + limiter = new InflightReadsLimiter(1); + assertFalse(limiter.isDisabled()); + } + + @Test + public void testBasicAcquireRelease() throws Exception { + InflightReadsLimiter limiter = new InflightReadsLimiter(100); + assertEquals(100, limiter.getRemainingBytes()); + InflightReadsLimiter.Handle handle = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle.success); + assertEquals(handle.acquiredPermits, 100); + assertEquals(1, handle.trials); + limiter.release(handle); + assertEquals(100, limiter.getRemainingBytes()); + } + + @Test + public void testNotEnoughPermits() throws Exception { + InflightReadsLimiter limiter = new InflightReadsLimiter(100); + assertEquals(100, limiter.getRemainingBytes()); + InflightReadsLimiter.Handle handle = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle.success); + assertEquals(handle.acquiredPermits, 100); + assertEquals(1, handle.trials); + + InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 0); + assertEquals(1, handle2.trials); + + limiter.release(handle); + assertEquals(100, limiter.getRemainingBytes()); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle2.success); + assertEquals(handle2.acquiredPermits, 100); + assertEquals(2, handle2.trials); + + limiter.release(handle2); + assertEquals(100, limiter.getRemainingBytes()); + + } + + @Test + public void testPartialAcquire() throws Exception { + InflightReadsLimiter limiter = new InflightReadsLimiter(100); + assertEquals(100, limiter.getRemainingBytes()); + + InflightReadsLimiter.Handle handle = limiter.acquire(30, null); + assertEquals(70, limiter.getRemainingBytes()); + assertTrue(handle.success); + assertEquals(handle.acquiredPermits, 30); + assertEquals(1, handle.trials); + + InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(1, handle2.trials); + + limiter.release(handle); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle2.success); + assertEquals(handle2.acquiredPermits, 100); + assertEquals(2, handle2.trials); + + limiter.release(handle2); + assertEquals(100, limiter.getRemainingBytes()); + + } + + @Test + public void testTooManyTrials() throws Exception { + InflightReadsLimiter limiter = new InflightReadsLimiter(100); + assertEquals(100, limiter.getRemainingBytes()); + + InflightReadsLimiter.Handle handle = limiter.acquire(30, null); + assertEquals(70, limiter.getRemainingBytes()); + assertTrue(handle.success); + assertEquals(handle.acquiredPermits, 30); + assertEquals(1, handle.trials); + + InflightReadsLimiter.Handle handle2 = limiter.acquire(100, null); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(1, handle2.trials); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(2, handle2.trials); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(3, handle2.trials); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 70); + assertEquals(4, handle2.trials); + + // too many trials, start from scratch + handle2 = limiter.acquire(100, handle2); + assertEquals(70, limiter.getRemainingBytes()); + assertFalse(handle2.success); + assertEquals(handle2.acquiredPermits, 0); + assertEquals(1, handle2.trials); + + limiter.release(handle); + + handle2 = limiter.acquire(100, handle2); + assertEquals(0, limiter.getRemainingBytes()); + assertTrue(handle2.success); + assertEquals(handle2.acquiredPermits, 100); + assertEquals(2, handle2.trials); + + limiter.release(handle2); + assertEquals(100, limiter.getRemainingBytes()); + + } + +} diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java index e1d429396d761..04eb95b2b3ff0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; @@ -80,12 +81,19 @@ void after() { RangeEntryCacheImpl rangeEntryCache; PendingReadsManager pendingReadsManager; + InflightReadsLimiter inflighReadsLimiter; ReadHandle lh; ManagedLedgerImpl ml; @BeforeMethod(alwaysRun = true) void setupMocks() { rangeEntryCache = mock(RangeEntryCacheImpl.class); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setReadEntryTimeoutSeconds(10000); + when(rangeEntryCache.getName()).thenReturn("my-topic"); + when(rangeEntryCache.getManagedLedgerConfig()).thenReturn(config); + inflighReadsLimiter = new InflightReadsLimiter(0); + when(rangeEntryCache.getPendingReadsLimiter()).thenReturn(inflighReadsLimiter); pendingReadsManager = new PendingReadsManager(rangeEntryCache); doAnswer(new Answer() { @Override diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 7e1e339ccb3d8..d7617ef277444 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1864,6 +1864,12 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when " + "inserting in cache") private boolean managedLedgerCacheCopyEntries = false; + + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Maximum buffer size for bytes read from storage." + + " This is the memory retained by data read from storage (or cache) until it has been delivered to the" + + " Consumer Netty channel. Use O to disable") + private long managedLedgerMaxReadsInFlightSizeInMB = 0; + @FieldContext( category = CATEGORY_STORAGE_ML, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index 29d815860742f..b16b9a7dd4833 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -63,6 +63,8 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis( conf.getManagedLedgerCacheEvictionTimeThresholdMillis()); managedLedgerFactoryConfig.setCopyEntriesInCache(conf.isManagedLedgerCacheCopyEntries()); + managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize( + conf.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024L * 1024L); managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds( conf.getManagedLedgerPrometheusStatsLatencyRolloverSeconds()); managedLedgerFactoryConfig.setTraceTaskExecution(conf.isManagedLedgerTraceTaskExecution()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index f9e8e61d40060..02400f6cdeee3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import io.netty.buffer.ByteBuf; +import io.prometheus.client.Gauge; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -49,6 +50,12 @@ @Slf4j public abstract class AbstractBaseDispatcher extends EntryFilterSupport implements Dispatcher { + private static final Gauge PENDING_BYTES_TO_DISPATCH = Gauge + .build() + .name("pulsar_broker_pending_bytes_to_dispatch") + .help("Amount of bytes loaded in memory to be dispatched to Consumers") + .register(); + protected final ServiceConfiguration serviceConfig; protected final boolean dispatchThrottlingOnBatchMessageEnabled; private final LongAdder filterProcessedMsgs = new LongAdder(); @@ -370,4 +377,8 @@ public long getFilterRejectedMsgCount() { public long getFilterRescheduledMsgCount() { return this.filterRescheduledMsgs.longValue(); } + + protected final void updatePendingBytesToDispatch(long size) { + PENDING_BYTES_TO_DISPATCH.inc(size); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index 84e23733cfdde..2bc933e75fd88 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -22,6 +22,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; +import java.util.ArrayList; import java.util.List; import java.util.Optional; import lombok.extern.slf4j.Slf4j; @@ -229,6 +230,10 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, final ChannelHandlerContext ctx = cnx.ctx(); final ChannelPromise writePromise = ctx.newPromise(); ctx.channel().eventLoop().execute(() -> { + // this list is always accessed in the same thread (the eventLoop here) + // and in the completion of the writePromise + // it is safe to use a simple ArrayList + List entriesToRelease = new ArrayList<>(entries.size()); for (int i = 0; i < entries.size(); i++) { Entry entry = entries.get(i); if (entry == null) { @@ -277,11 +282,20 @@ public ChannelPromise sendMessagesToConsumer(long consumerId, String topicName, redeliveryCount, metadataAndPayload, batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName, epoch), ctx.voidPromise()); - entry.release(); + entriesToRelease.add(entry); } // Use an empty write here so that we can just tie the flush with the write promise for last entry ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise); + writePromise.addListener((future) -> { + // release the entries only after flushing the channel + // + // InflightReadsLimiter tracks the amount of memory retained by in-flight data to the + // consumer. It counts the memory as being released when the entry is deallocated + // that is that it reaches refcnt=0. + // so we need to call release only when we are sure that Netty released the internal ByteBuf + entriesToRelease.forEach(Entry::release); + }); batchSizes.recyle(); if (batchIndexesAcks != null) { batchIndexesAcks.recycle(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 193ee07561d2d..1d58e60de30c3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -549,6 +549,9 @@ public final synchronized void readEntriesComplete(List entries, Object c log.debug("[{}] Distributing {} messages to {} consumers", name, entries.size(), consumerList.size()); } + long size = entries.stream().mapToLong(Entry::getLength).sum(); + updatePendingBytesToDispatch(size); + // dispatch messages to a separate thread, but still in order for this subscription // sendMessagesToConsumers is responsible for running broker-side filters // that may be quite expensive @@ -558,12 +561,18 @@ public final synchronized void readEntriesComplete(List entries, Object c sendInProgress = true; dispatchMessagesThread.execute(safeRun(() -> { if (sendMessagesToConsumers(readType, entries)) { + updatePendingBytesToDispatch(-size); readMoreEntries(); + } else { + updatePendingBytesToDispatch(-size); } })); } else { if (sendMessagesToConsumers(readType, entries)) { + updatePendingBytesToDispatch(-size); readMoreEntriesAsync(); + } else { + updatePendingBytesToDispatch(-size); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 66f2473a639e2..2a91b45b3f910 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -105,6 +105,8 @@ public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger()) .getNextValidPosition((PositionImpl) entry.getPosition())); + long size = entry.getLength(); + updatePendingBytesToDispatch(size); // dispatch messages to a separate thread, but still in order for this subscription // sendMessagesToConsumers is responsible for running broker-side filters // that may be quite expensive @@ -115,11 +117,15 @@ public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest dispatchMessagesThread.execute(safeRun(() -> { if (sendMessagesToConsumers(readType, Lists.newArrayList(entry), ctx.isLast())) { readMoreEntries(); + } else { + updatePendingBytesToDispatch(-size); } })); } else { if (sendMessagesToConsumers(readType, Lists.newArrayList(entry), ctx.isLast())) { readMoreEntriesAsync(); + } else { + updatePendingBytesToDispatch(-size); } } ctx.recycle();