Skip to content

Commit

Permalink
[pulsar-broker] Keep max-concurrent http web-request configurable (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored Oct 29, 2020
1 parent da61b56 commit a9e2f7e
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 2 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ numExecutorThreadPoolSize=
# Default is 10
numCacheExecutorThreadPoolSize=10

# Max concurrent web requests
maxConcurrentHttpRequests=1024

# Flag to control features that are meant to be used when running in standalone mode
isRunningStandalone=

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ numExecutorThreadPoolSize=
# Default is 10
numCacheExecutorThreadPoolSize=10

# Max concurrent web requests
maxConcurrentHttpRequests=1024

# Name of the cluster to which this broker belongs to
clusterName=standalone

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int numCacheExecutorThreadPoolSize = 10;

@FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web requests")
private int maxConcurrentHttpRequests = 1024;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.")
private boolean delayedDeliveryEnabled = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ public class WebService implements AutoCloseable {

public static final String ATTRIBUTE_PULSAR_NAME = "pulsar";
public static final String HANDLER_CACHE_CONTROL = "max-age=3600";
public static final int MAX_CONCURRENT_REQUESTS = 1024; // make it configurable?

private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers;
private final WebExecutorThreadPool webServiceExecutor;
public final int maxConcurrentRequests;

private final ServerConnector httpConnector;
private final ServerConnector httpsConnector;
Expand All @@ -80,6 +80,7 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
pulsar.getConfiguration().getNumHttpServerThreads(),
"pulsar-web");
this.server = new Server(webServiceExecutor);
this.maxConcurrentRequests = pulsar.getConfiguration().getMaxConcurrentHttpRequests();
List<ServerConnector> connectors = new ArrayList<>();

Optional<Integer> port = pulsar.getConfiguration().getWebServicePort();
Expand Down Expand Up @@ -131,7 +132,7 @@ public WebService(PulsarService pulsar) throws PulsarServerException {
}

// Limit number of concurrent HTTP connections to avoid getting out of file descriptors
connectors.forEach(c -> c.setAcceptQueueSize(WebService.MAX_CONCURRENT_REQUESTS / connectors.size()));
connectors.forEach(c -> c.setAcceptQueueSize(maxConcurrentRequests / connectors.size()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
}

Expand Down

0 comments on commit a9e2f7e

Please sign in to comment.