diff --git a/conf/presto/catalog/pulsar.properties b/conf/presto/catalog/pulsar.properties index c104b1e0eb181..349e7262f650a 100644 --- a/conf/presto/catalog/pulsar.properties +++ b/conf/presto/catalog/pulsar.properties @@ -72,3 +72,32 @@ pulsar.rewrite-namespace-delimiter=/ ## Path for the trusted TLS certificate file #pulsar.tls-trust-cert-file-path = + +####### BOOKKEEPER CONFIGS ####### + +# Entries read count throttling-limit per seconds, 0 is represents disable the throttle, default is 0. +pulsar.bookkeeper-throttle-value = 0 + +# The number of threads used by Netty to handle TCP connections, +# default is 2 * Runtime.getRuntime().availableProcessors(). +# pulsar.bookkeeper-num-io-threads = + +# The number of worker threads used by bookkeeper client to submit operations, +# default is Runtime.getRuntime().availableProcessors(). +# pulsar.bookkeeper-num-worker-threads = + + +####### MANAGED LEDGER CONFIGS ####### + +# Amount of memory to use for caching data payload in managed ledger. This memory +# is allocated from JVM direct memory and it's shared across all the managed ledgers +# running in same sql worker. 0 is represents disable the cache, default is 0. +pulsar.managed-ledger-cache-size-MB = 0 + +# Number of threads to be used for managed ledger tasks dispatching, +# default is Runtime.getRuntime().availableProcessors(). +# pulsar.managed-ledger-num-worker-threads = + +# Number of threads to be used for managed ledger scheduled tasks, +# default is Runtime.getRuntime().availableProcessors(). +# pulsar.managed-ledger-num-scheduler-threads = diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index 59937291c00f5..6f481194dc3eb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -26,6 +26,7 @@ import com.google.common.primitives.Longs; import io.netty.buffer.ByteBuf; import java.util.Enumeration; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -38,6 +39,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -224,6 +226,30 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole @Override public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { + lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync( + (ledgerEntries, exception) -> { + if (exception != null) { + ml.invalidateLedgerHandle(lh, exception); + callback.readEntryFailed(createManagedLedgerException(exception), ctx); + return; + } + + try { + Iterator iterator = ledgerEntries.iterator(); + if (iterator.hasNext()) { + LedgerEntry ledgerEntry = iterator.next(); + EntryImpl returnEntry = EntryImpl.create(ledgerEntry); + + mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength()); + ml.getMBean().addReadEntriesSample(1, returnEntry.getLength()); + callback.readEntryComplete(returnEntry, ctx); + } else { + callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), ctx); + } + } finally { + ledgerEntries.close(); + } + }, ml.getExecutor().chooseThread(ml.getName())); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index 8971e11ac3d9d..9dabd81c3db94 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -18,21 +18,29 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import java.lang.reflect.Method; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; -import org.testng.annotations.BeforeClass; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @Test @@ -41,13 +49,16 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase { ManagedLedgerImpl ml1; ManagedLedgerImpl ml2; - @BeforeClass - void setup() throws Exception { + @BeforeMethod + void setup(Method method) throws Exception { + super.setUp(method); OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build(); ml1 = mock(ManagedLedgerImpl.class); when(ml1.getScheduledExecutor()).thenReturn(executor); when(ml1.getName()).thenReturn("cache1"); + when(ml1.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml1)); + when(ml1.getExecutor()).thenReturn(super.executor); ml2 = mock(ManagedLedgerImpl.class); when(ml2.getScheduledExecutor()).thenReturn(executor); @@ -309,4 +320,32 @@ void verifyTimeBasedEviction() throws Exception { factory.shutdown(); } + @Test(timeOut = 5000) + void entryCacheDisabledAsyncReadEntry() throws Exception { + ReadHandle lh = EntryCacheTest.getLedgerHandle(); + + ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig(); + config.setMaxCacheSize(0); + ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), config); + EntryCacheManager cacheManager = factory.getEntryCacheManager(); + EntryCache entryCache = cacheManager.getEntryCache(ml1); + + final CountDownLatch counter = new CountDownLatch(1); + entryCache.asyncReadEntry(lh, new PositionImpl(1L ,1L), new AsyncCallbacks.ReadEntryCallback() { + public void readEntryComplete(Entry entry, Object ctx) { + Assert.assertNotEquals(entry, null); + entry.release(); + counter.countDown(); + } + + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { + Assert.fail("should not have failed"); + counter.countDown(); + } + }, null); + counter.await(); + + verify(lh).readAsync(anyLong(), anyLong()); + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index aa2730c971da2..7cfc97f84a9ce 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -239,7 +239,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { counter.await(); } - private static ReadHandle getLedgerHandle() { + static ReadHandle getLedgerHandle() { final ReadHandle lh = mock(ReadHandle.class); final LedgerEntry ledgerEntry = mock(LedgerEntry.class, Mockito.CALLS_REAL_METHODS); doReturn(Unpooled.wrappedBuffer(new byte[10])).when(ledgerEntry).getEntryBuffer(); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index 0c385af6589bf..1af86a2225a06 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -31,6 +31,7 @@ import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.offload.OffloaderUtils; @@ -88,12 +89,23 @@ public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsa private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { ClientConfiguration bkClientConfiguration = new ClientConfiguration() + .setZkServers(pulsarConnectorConfig.getZookeeperUri()) .setMetadataServiceUri("zk://" + pulsarConnectorConfig.getZookeeperUri() + "/ledgers") .setClientTcpNoDelay(false) .setUseV2WireProtocol(true) .setStickyReadsEnabled(false) - .setReadEntryTimeout(60); - return new ManagedLedgerFactoryImpl(bkClientConfiguration, pulsarConnectorConfig.getZookeeperUri()); + .setReadEntryTimeout(60) + .setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue()) + .setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads()) + .setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads()); + + ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig(); + managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB()); + managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads( + pulsarConnectorConfig.getManagedLedgerNumWorkerThreads()); + managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads( + pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads()); + return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig); } public ManagedLedgerConfig getManagedLedgerConfig() { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java index b53c589b4ea69..b2b27558eb05b 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java @@ -64,6 +64,16 @@ public class PulsarConnectorConfig implements AutoCloseable { private PulsarAdmin pulsarAdmin; + // --- Bookkeeper + private int bookkeeperThrottleValue = 0; + private int bookkeeperNumIOThreads = 2 * Runtime.getRuntime().availableProcessors(); + private int bookkeeperNumWorkerThreads = Runtime.getRuntime().availableProcessors(); + + // --- ManagedLedger + private long managedLedgerCacheSizeMB = 0L; + private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors(); + private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors(); + @NotNull public String getBrokerServiceUrl() { return brokerServiceUrl; @@ -285,6 +295,69 @@ public PulsarConnectorConfig setTlsTrustCertsFilePath(String tlsTrustCertsFilePa return this; } + // --- Bookkeeper Config --- + + public int getBookkeeperThrottleValue() { + return bookkeeperThrottleValue; + } + + @Config("pulsar.bookkeeper-throttle-value") + public PulsarConnectorConfig setBookkeeperThrottleValue(int bookkeeperThrottleValue) { + this.bookkeeperThrottleValue = bookkeeperThrottleValue; + return this; + } + + public int getBookkeeperNumIOThreads() { + return bookkeeperNumIOThreads; + } + + @Config("pulsar.bookkeeper-num-io-threads") + public PulsarConnectorConfig setBookkeeperNumIOThreads(int bookkeeperNumIOThreads) { + this.bookkeeperNumIOThreads = bookkeeperNumIOThreads; + return this; + } + + public int getBookkeeperNumWorkerThreads() { + return bookkeeperNumWorkerThreads; + } + + @Config("pulsar.bookkeeper-num-worker-threads") + public PulsarConnectorConfig setBookkeeperNumWorkerThreads(int bookkeeperNumWorkerThreads) { + this.bookkeeperNumWorkerThreads = bookkeeperNumWorkerThreads; + return this; + } + + // --- ManagedLedger + public long getManagedLedgerCacheSizeMB() { + return managedLedgerCacheSizeMB; + } + + @Config("pulsar.managed-ledger-cache-size-MB") + public PulsarConnectorConfig setManagedLedgerCacheSizeMB(int managedLedgerCacheSizeMB) { + this.managedLedgerCacheSizeMB = managedLedgerCacheSizeMB * 1024 * 1024; + return this; + } + + public int getManagedLedgerNumWorkerThreads() { + return managedLedgerNumWorkerThreads; + } + + @Config("pulsar.managed-ledger-num-worker-threads") + public PulsarConnectorConfig setManagedLedgerNumWorkerThreads(int managedLedgerNumWorkerThreads) { + this.managedLedgerNumWorkerThreads = managedLedgerNumWorkerThreads; + return this; + } + + public int getManagedLedgerNumSchedulerThreads() { + return managedLedgerNumSchedulerThreads; + } + + @Config("pulsar.managed-ledger-num-scheduler-threads") + public PulsarConnectorConfig setManagedLedgerNumSchedulerThreads(int managedLedgerNumSchedulerThreads) { + this.managedLedgerNumSchedulerThreads = managedLedgerNumSchedulerThreads; + return this; + } + @NotNull public PulsarAdmin getPulsarAdmin() throws PulsarClientException { if (this.pulsarAdmin == null) { diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java index 82b8c977bf588..faf2bbc2639a3 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java @@ -49,4 +49,23 @@ public void testNamespaceRewriteDelimiterRestriction() { connectorConfig.setRewriteNamespaceDelimiter("--&"); Assert.assertEquals("--&", (connectorConfig.getRewriteNamespaceDelimiter())); } + + @Test + public void testDefaultBookkeeperConfig() { + int availableProcessors = Runtime.getRuntime().availableProcessors(); + PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig(); + Assert.assertEquals(0, connectorConfig.getBookkeeperThrottleValue()); + Assert.assertEquals(2 * availableProcessors, connectorConfig.getBookkeeperNumIOThreads()); + Assert.assertEquals(availableProcessors, connectorConfig.getBookkeeperNumWorkerThreads()); + } + + @Test + public void testDefaultManagedLedgerConfig() { + int availableProcessors = Runtime.getRuntime().availableProcessors(); + PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig(); + Assert.assertEquals(0L, connectorConfig.getManagedLedgerCacheSizeMB()); + Assert.assertEquals(availableProcessors, connectorConfig.getManagedLedgerNumWorkerThreads()); + Assert.assertEquals(availableProcessors, connectorConfig.getManagedLedgerNumSchedulerThreads()); + } + }