Skip to content

Commit

Permalink
make pulsar executor pool size configurable (apache#7782)
Browse files Browse the repository at this point in the history
#### Motivation
The pulsar executor pool size number is hard code to `20` when pulsar service start, it should be configurable in broker.conf.
```
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20,
            new DefaultThreadFactory("pulsar"));
```

#### Changes
make the executor pool size configurable in broker.conf
  • Loading branch information
hangc0276 authored Aug 10, 2020
1 parent c7ead15 commit bd09aa0
Show file tree
Hide file tree
Showing 25 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected void internalSetUpForBroker() throws Exception {
Set<String> tlsProtocols = Sets.newConcurrentHashSet();
tlsProtocols.add("TLSv1.2");
conf.setTlsProtocols(tlsProtocols);
conf.setNumExecutorThreadPoolSize(5);
}

protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ protected void internalSetUpForBroker() throws Exception {
Set<String> tlsProtocols = Sets.newConcurrentHashSet();
tlsProtocols.add("TLSv1.2");
conf.setTlsProtocols(tlsProtocols);
conf.setNumExecutorThreadPoolSize(5);

// load bcfips in
URL bouncyCastleUrl = this.getClass().getClassLoader().getResource("bouncy-castle-bcfips.nar");
Expand Down
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ numIOThreads=
# Number of threads to use for HTTP requests processing. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numHttpServerThreads=

# Number of thread pool size to use for pulsar broker service.
# The executor in thread pool will do basic broker operation like load/unload bundle, update managedLedgerConfig,
# update topic/subscription/replicator message dispatch rate, do leader election etc.
# Default is Runtime.getRuntime().availableProcessors()
numExecutorThreadPoolSize=

# Flag to control features that are meant to be used when running in standalone mode
isRunningStandalone=

Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ numIOThreads=
# Number of threads to use for HTTP requests processing. Default is set to 2 * Runtime.getRuntime().availableProcessors()
numHttpServerThreads=

# Number of thread pool size to use for pulsar broker service.
# The executor in thread pool will do basic broker operation like load/unload bundle, update managedLedgerConfig,
# update topic/subscription/replicator message dispatch rate, do leader election etc.
# Default is Runtime.getRuntime().availableProcessors()
numExecutorThreadPoolSize=

# Name of the cluster to which this broker belongs to
clusterName=standalone

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
// waiting for another HTTP call to complete in same thread.
private int numHttpServerThreads = Math.max(8, 2 * Runtime.getRuntime().availableProcessors());

@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of threads to use for pulsar broker service."
+ " The executor in thread pool will do basic broker operation like load/unload bundle,"
+ " update managedLedgerConfig, update topic/subscription/replicator message dispatch rate,"
+ " do leader election etc. Default is set to 20 "
)
private int numExecutorThreadPoolSize = Runtime.getRuntime().availableProcessors();

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.")
private boolean delayedDeliveryEnabled = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ public class PulsarService implements AutoCloseable {
private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
private Compactor compactor;

private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20,
new DefaultThreadFactory("pulsar"));
private final ScheduledExecutorService executor;

private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10,
new DefaultThreadFactory("zk-cache-callback"));
private OrderedExecutor orderedExecutor;
Expand Down Expand Up @@ -258,6 +258,8 @@ public PulsarService(ServiceConfiguration config, Optional<WorkerService> functi
this.loadManagerExecutor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager"));
this.functionWorkerService = functionWorkerService;
this.executor = Executors.newScheduledThreadPool(config.getNumExecutorThreadPoolSize(),
new DefaultThreadFactory("pulsar"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public void setup() throws Exception {
conf.setMessageExpiryCheckIntervalInMinutes(1);
conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
conf.setNumExecutorThreadPoolSize(5);

super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void setup() throws Exception {
String.format("tlsCertFile:%s,tlsKeyFile:%s", getTLSFile("admin.cert"), getTLSFile("admin.key-pk8")));
conf.setBrokerClientTrustCertsFilePath(getTLSFile("ca.cert"));
conf.setBrokerClientTlsEnabled(true);
conf.setNumExecutorThreadPoolSize(5);

super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private void buildConf(ServiceConfiguration conf) {
conf.setBrokerClientAuthenticationPlugin("org.apache.pulsar.client.impl.auth.AuthenticationTls");
conf.setBrokerClientTrustCertsFilePath(getTLSFile("ca.cert"));
conf.setTlsAllowInsecureConnection(true);
conf.setNumExecutorThreadPoolSize(5);
}

@AfterMethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ public void setup() throws Exception {
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);

super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ protected void resetConfig() {
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setBookkeeperClientExposeStatsToPrometheus(true);
this.conf.setNumExecutorThreadPoolSize(5);
}

protected final void internalSetup() throws Exception {
Expand Down Expand Up @@ -147,6 +148,7 @@ protected void doInitConf() throws Exception {
this.conf.setAdvertisedAddress("localhost");
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setNumExecutorThreadPoolSize(5);
}

protected final void init() throws Exception {
Expand All @@ -170,6 +172,7 @@ protected final void init(boolean isPreciseDispatcherFlowControl) throws Excepti
this.conf.setWebServicePort(Optional.of(0));
this.conf.setWebServicePortTls(Optional.of(0));
this.conf.setPreciseDispatcherFlowControl(isPreciseDispatcherFlowControl);
this.conf.setNumExecutorThreadPoolSize(5);

sameThreadOrderedSafeExecutor = new SameThreadOrderedSafeExecutor();
bkExecutor = Executors.newSingleThreadExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ public void testTlsEnabled() throws Exception {
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();

// Case 1: Access without TLS
Expand Down Expand Up @@ -554,6 +555,7 @@ public void testTlsAuthAllowInsecure() throws Exception {
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();

Map<String, String> authParams = new HashMap<>();
Expand Down Expand Up @@ -616,6 +618,7 @@ public void testTlsAuthDisallowInsecure() throws Exception {
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();

Map<String, String> authParams = new HashMap<>();
Expand Down Expand Up @@ -678,6 +681,7 @@ public void testTlsAuthUseTrustCert() throws Exception {
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(false);
conf.setTlsTrustCertsFilePath(TLS_CLIENT_CERT_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
restartBroker();

Map<String, String> authParams = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected void setup() throws Exception {
conf.setAuthenticationProviders(providers);

conf.setClusterName("test");

conf.setNumExecutorThreadPoolSize(5);
super.init();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ protected void setup() throws Exception {
conf.setAuthenticationProviders(providers);

conf.setClusterName("test");
conf.setNumExecutorThreadPoolSize(5);

super.init();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ public void testWebserviceServiceTls() throws Exception {
conf.setTlsAllowInsecureConnection(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
stopBroker();
startBroker();
pulsar.getLoadManager().get().writeLoadReportOnZookeeper();
Expand Down Expand Up @@ -539,6 +540,7 @@ public void testDiscoveryLookupTls() throws Exception {
conf.setTlsAllowInsecureConnection(true);
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setNumExecutorThreadPoolSize(5);
stopBroker();
startBroker();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ protected void internalSetUpForBroker() throws Exception {
Set<String> tlsProtocols = Sets.newConcurrentHashSet();
tlsProtocols.add("TLSv1.2");
conf.setTlsProtocols(tlsProtocols);
conf.setNumExecutorThreadPoolSize(5);
}

protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public void setup() throws Exception {
conf.setBrokerClientAuthenticationParameters(mapToString(authParams));
conf.setBrokerClientTlsTrustStore(BROKER_TRUSTSTORE_FILE_PATH);
conf.setBrokerClientTlsTrustStorePassword(BROKER_TRUSTSTORE_PW);
conf.setNumExecutorThreadPoolSize(5);

super.init();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ protected void internalSetUpForBroker() {
Set<String> providers = new HashSet<>();
providers.add(AuthenticationProviderTls.class.getName());
conf.setAuthenticationProviders(providers);
conf.setNumExecutorThreadPoolSize(5);
}

protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ protected void internalSetUpForBroker() throws Exception {
conf.setTlsRequireTrustedClientCertOnConnect(true);
tlsProtocols.add("TLSv1.2");
conf.setTlsProtocols(tlsProtocols);
conf.setNumExecutorThreadPoolSize(5);
}

protected void internalSetUpForClient(boolean addCertificates, String lookupUrl) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ protected void setup() throws Exception {
conf.setSuperUserRoles(ImmutableSet.of("admin"));
conf.setProxyRoles(ImmutableSet.of("proxy"));
conf.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
conf.setNumExecutorThreadPoolSize(5);

super.internalSetup();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected void setup() throws Exception {
conf.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
conf.setTlsAllowInsecureConnection(true);
conf.setNumExecutorThreadPoolSize(5);

Set<String> superUserRoles = new HashSet<>();
superUserRoles.add("localhost");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ protected void setup() throws Exception {
conf.setAuthenticationProviders(providers);

conf.setClusterName("proxy-authorization-neg");
conf.setNumExecutorThreadPoolSize(5);

super.init();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ protected void setup() throws Exception {
conf.setAuthenticationProviders(providers);

conf.setClusterName("proxy-authorization");
conf.setNumExecutorThreadPoolSize(5);

super.init();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ protected void setup() throws Exception {
conf.setAuthenticationProviders(providers);

conf.setClusterName("without-service-discovery");
conf.setNumExecutorThreadPoolSize(5);

super.init();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ protected void setup() throws Exception {
conf.setSuperUserRoles(ImmutableSet.of("admin", "superproxy"));
conf.setProxyRoles(ImmutableSet.of("superproxy"));
conf.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
conf.setNumExecutorThreadPoolSize(5);

super.internalSetup();

Expand Down

0 comments on commit bd09aa0

Please sign in to comment.