Skip to content

Commit

Permalink
[pulsar-sql]Expose configurations of managed ledger and bookkeeper cl…
Browse files Browse the repository at this point in the history
…ient. (apache#5702)

Motivation
Expose more configurations of managed ledger and bookkeeper client, this will provide ability for users to optimize performance of entries reading.

Modifications
Expose some configurations related to managed ledger and bookkeeper client in pulsar.config.

Verifying this change
Add unit tests to verify the default value of the configurations.
  • Loading branch information
gaoran10 authored and jiazhai committed Dec 17, 2019
1 parent 3683b59 commit 2d81c57
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 6 deletions.
29 changes: 29 additions & 0 deletions conf/presto/catalog/pulsar.properties
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,32 @@ pulsar.rewrite-namespace-delimiter=/

## Path for the trusted TLS certificate file
#pulsar.tls-trust-cert-file-path =

####### BOOKKEEPER CONFIGS #######

# Entries read count throttling-limit per seconds, 0 is represents disable the throttle, default is 0.
pulsar.bookkeeper-throttle-value = 0

# The number of threads used by Netty to handle TCP connections,
# default is 2 * Runtime.getRuntime().availableProcessors().
# pulsar.bookkeeper-num-io-threads =

# The number of worker threads used by bookkeeper client to submit operations,
# default is Runtime.getRuntime().availableProcessors().
# pulsar.bookkeeper-num-worker-threads =


####### MANAGED LEDGER CONFIGS #######

# Amount of memory to use for caching data payload in managed ledger. This memory
# is allocated from JVM direct memory and it's shared across all the managed ledgers
# running in same sql worker. 0 is represents disable the cache, default is 0.
pulsar.managed-ledger-cache-size-MB = 0

# Number of threads to be used for managed ledger tasks dispatching,
# default is Runtime.getRuntime().availableProcessors().
# pulsar.managed-ledger-num-worker-threads =

# Number of threads to be used for managed ledger scheduled tasks,
# default is Runtime.getRuntime().availableProcessors().
# pulsar.managed-ledger-num-scheduler-threads =
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,6 +39,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -224,6 +226,30 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
@Override
public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh, exception);
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
return;
}

try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
if (iterator.hasNext()) {
LedgerEntry ledgerEntry = iterator.next();
EntryImpl returnEntry = EntryImpl.create(ledgerEntry);

mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
ml.getMBean().addReadEntriesSample(1, returnEntry.getLength());
callback.readEntryComplete(returnEntry, ctx);
} else {
callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), ctx);
}
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,29 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.annotations.BeforeClass;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test
Expand All @@ -41,13 +49,16 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {
ManagedLedgerImpl ml1;
ManagedLedgerImpl ml2;

@BeforeClass
void setup() throws Exception {
@BeforeMethod
void setup(Method method) throws Exception {
super.setUp(method);
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build();

ml1 = mock(ManagedLedgerImpl.class);
when(ml1.getScheduledExecutor()).thenReturn(executor);
when(ml1.getName()).thenReturn("cache1");
when(ml1.getMBean()).thenReturn(new ManagedLedgerMBeanImpl(ml1));
when(ml1.getExecutor()).thenReturn(super.executor);

ml2 = mock(ManagedLedgerImpl.class);
when(ml2.getScheduledExecutor()).thenReturn(executor);
Expand Down Expand Up @@ -309,4 +320,32 @@ void verifyTimeBasedEviction() throws Exception {
factory.shutdown();
}

@Test(timeOut = 5000)
void entryCacheDisabledAsyncReadEntry() throws Exception {
ReadHandle lh = EntryCacheTest.getLedgerHandle();

ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
config.setMaxCacheSize(0);
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle(), config);
EntryCacheManager cacheManager = factory.getEntryCacheManager();
EntryCache entryCache = cacheManager.getEntryCache(ml1);

final CountDownLatch counter = new CountDownLatch(1);
entryCache.asyncReadEntry(lh, new PositionImpl(1L ,1L), new AsyncCallbacks.ReadEntryCallback() {
public void readEntryComplete(Entry entry, Object ctx) {
Assert.assertNotEquals(entry, null);
entry.release();
counter.countDown();
}

public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
Assert.fail("should not have failed");
counter.countDown();
}
}, null);
counter.await();

verify(lh).readAsync(anyLong(), anyLong());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
counter.await();
}

private static ReadHandle getLedgerHandle() {
static ReadHandle getLedgerHandle() {
final ReadHandle lh = mock(ReadHandle.class);
final LedgerEntry ledgerEntry = mock(LedgerEntry.class, Mockito.CALLS_REAL_METHODS);
doReturn(Unpooled.wrappedBuffer(new byte[10])).when(ledgerEntry).getEntryBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.bookkeeper.mledger.LedgerOffloaderFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.offload.OffloaderUtils;
Expand Down Expand Up @@ -88,12 +89,23 @@ public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsa
private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
throws Exception {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
.setMetadataServiceUri("zk://" + pulsarConnectorConfig.getZookeeperUri() + "/ledgers")
.setClientTcpNoDelay(false)
.setUseV2WireProtocol(true)
.setStickyReadsEnabled(false)
.setReadEntryTimeout(60);
return new ManagedLedgerFactoryImpl(bkClientConfiguration, pulsarConnectorConfig.getZookeeperUri());
.setReadEntryTimeout(60)
.setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
.setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
.setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());

ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
managedLedgerFactoryConfig.setNumManagedLedgerWorkerThreads(
pulsarConnectorConfig.getManagedLedgerNumWorkerThreads());
managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(
pulsarConnectorConfig.getManagedLedgerNumSchedulerThreads());
return new ManagedLedgerFactoryImpl(bkClientConfiguration, managedLedgerFactoryConfig);
}

public ManagedLedgerConfig getManagedLedgerConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ public class PulsarConnectorConfig implements AutoCloseable {

private PulsarAdmin pulsarAdmin;

// --- Bookkeeper
private int bookkeeperThrottleValue = 0;
private int bookkeeperNumIOThreads = 2 * Runtime.getRuntime().availableProcessors();
private int bookkeeperNumWorkerThreads = Runtime.getRuntime().availableProcessors();

// --- ManagedLedger
private long managedLedgerCacheSizeMB = 0L;
private int managedLedgerNumWorkerThreads = Runtime.getRuntime().availableProcessors();
private int managedLedgerNumSchedulerThreads = Runtime.getRuntime().availableProcessors();

@NotNull
public String getBrokerServiceUrl() {
return brokerServiceUrl;
Expand Down Expand Up @@ -285,6 +295,69 @@ public PulsarConnectorConfig setTlsTrustCertsFilePath(String tlsTrustCertsFilePa
return this;
}

// --- Bookkeeper Config ---

public int getBookkeeperThrottleValue() {
return bookkeeperThrottleValue;
}

@Config("pulsar.bookkeeper-throttle-value")
public PulsarConnectorConfig setBookkeeperThrottleValue(int bookkeeperThrottleValue) {
this.bookkeeperThrottleValue = bookkeeperThrottleValue;
return this;
}

public int getBookkeeperNumIOThreads() {
return bookkeeperNumIOThreads;
}

@Config("pulsar.bookkeeper-num-io-threads")
public PulsarConnectorConfig setBookkeeperNumIOThreads(int bookkeeperNumIOThreads) {
this.bookkeeperNumIOThreads = bookkeeperNumIOThreads;
return this;
}

public int getBookkeeperNumWorkerThreads() {
return bookkeeperNumWorkerThreads;
}

@Config("pulsar.bookkeeper-num-worker-threads")
public PulsarConnectorConfig setBookkeeperNumWorkerThreads(int bookkeeperNumWorkerThreads) {
this.bookkeeperNumWorkerThreads = bookkeeperNumWorkerThreads;
return this;
}

// --- ManagedLedger
public long getManagedLedgerCacheSizeMB() {
return managedLedgerCacheSizeMB;
}

@Config("pulsar.managed-ledger-cache-size-MB")
public PulsarConnectorConfig setManagedLedgerCacheSizeMB(int managedLedgerCacheSizeMB) {
this.managedLedgerCacheSizeMB = managedLedgerCacheSizeMB * 1024 * 1024;
return this;
}

public int getManagedLedgerNumWorkerThreads() {
return managedLedgerNumWorkerThreads;
}

@Config("pulsar.managed-ledger-num-worker-threads")
public PulsarConnectorConfig setManagedLedgerNumWorkerThreads(int managedLedgerNumWorkerThreads) {
this.managedLedgerNumWorkerThreads = managedLedgerNumWorkerThreads;
return this;
}

public int getManagedLedgerNumSchedulerThreads() {
return managedLedgerNumSchedulerThreads;
}

@Config("pulsar.managed-ledger-num-scheduler-threads")
public PulsarConnectorConfig setManagedLedgerNumSchedulerThreads(int managedLedgerNumSchedulerThreads) {
this.managedLedgerNumSchedulerThreads = managedLedgerNumSchedulerThreads;
return this;
}

@NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,23 @@ public void testNamespaceRewriteDelimiterRestriction() {
connectorConfig.setRewriteNamespaceDelimiter("--&");
Assert.assertEquals("--&", (connectorConfig.getRewriteNamespaceDelimiter()));
}

@Test
public void testDefaultBookkeeperConfig() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
Assert.assertEquals(0, connectorConfig.getBookkeeperThrottleValue());
Assert.assertEquals(2 * availableProcessors, connectorConfig.getBookkeeperNumIOThreads());
Assert.assertEquals(availableProcessors, connectorConfig.getBookkeeperNumWorkerThreads());
}

@Test
public void testDefaultManagedLedgerConfig() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
Assert.assertEquals(0L, connectorConfig.getManagedLedgerCacheSizeMB());
Assert.assertEquals(availableProcessors, connectorConfig.getManagedLedgerNumWorkerThreads());
Assert.assertEquals(availableProcessors, connectorConfig.getManagedLedgerNumSchedulerThreads());
}

}

0 comments on commit 2d81c57

Please sign in to comment.