Skip to content

Commit

Permalink
make zk cache executor pool size configurable (apache#7794)
Browse files Browse the repository at this point in the history
### Motivation
The zk cache executor thread pool size is hard code to 10 when pulsar service start, it should be configurable in broker.conf.
```
private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10,
            new DefaultThreadFactory("zk-cache-callback"));
```
#### Changes
make the zk cache executor threads pool size configurable in broker.conf
  • Loading branch information
hangc0276 authored Aug 11, 2020
1 parent bd8d2ba commit 1e8f4b6
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 2 deletions.
5 changes: 5 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ numHttpServerThreads=
# Default is Runtime.getRuntime().availableProcessors()
numExecutorThreadPoolSize=

# Number of thread pool size to use for pulsar zookeeper callback service
# The cache executor thread pool is used for restarting global zookeeper session.
# Default is 10
numCacheExecutorThreadPoolSize=10

# Flag to control features that are meant to be used when running in standalone mode
isRunningStandalone=

Expand Down
5 changes: 5 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ numHttpServerThreads=
# Default is Runtime.getRuntime().availableProcessors()
numExecutorThreadPoolSize=

# Number of thread pool size to use for pulsar zookeeper callback service
# The cache executor thread pool is used for restarting global zookeeper session.
# Default is 10
numCacheExecutorThreadPoolSize=10

# Name of the cluster to which this broker belongs to
clusterName=standalone

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int numExecutorThreadPoolSize = Runtime.getRuntime().availableProcessors();

@FieldContext(
category = CATEGORY_SERVER,
doc = "Number of thread pool size to use for pulsar zookeeper callback service."
+ "The cache executor thread pool is used for restarting global zookeeper session. "
+ "Default is 10"
)
private int numCacheExecutorThreadPoolSize = 10;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.")
private boolean delayedDeliveryEnabled = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,8 @@ public class PulsarService implements AutoCloseable {
private Compactor compactor;

private final ScheduledExecutorService executor;
private final ScheduledExecutorService cacheExecutor;

private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10,
new DefaultThreadFactory("zk-cache-callback"));
private OrderedExecutor orderedExecutor;
private final ScheduledExecutorService loadManagerExecutor;
private ScheduledExecutorService compactorExecutor;
Expand Down Expand Up @@ -260,6 +259,8 @@ public PulsarService(ServiceConfiguration config, Optional<WorkerService> functi
this.functionWorkerService = functionWorkerService;
this.executor = Executors.newScheduledThreadPool(config.getNumExecutorThreadPoolSize(),
new DefaultThreadFactory("pulsar"));
this.cacheExecutor = Executors.newScheduledThreadPool(config.getNumCacheExecutorThreadPoolSize(),
new DefaultThreadFactory("zk-cache-callback"));
}

/**
Expand Down

0 comments on commit 1e8f4b6

Please sign in to comment.