Skip to content

Commit

Permalink
[WebSocket Proxy] Make websocket proxy thread pool size configurable (a…
Browse files Browse the repository at this point in the history
…pache#15117)

- it was hard coded to 20 threads
- make thread pool size configurable by webSocketNumServiceThreads
- remove unused orderedExecutor
  • Loading branch information
lhotari authored Apr 12, 2022
1 parent 4bee192 commit 189d235
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 15 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1219,6 +1219,9 @@ webSocketServiceEnabled=false
# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=

# Number of threads used by Websocket service
webSocketNumServiceThreads=

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=

Expand Down
3 changes: 3 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ clusterName=
# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=

# Number of threads used by Websocket service
webSocketNumServiceThreads=

# Number of threads to use in HTTP server. Default is Runtime.getRuntime().availableProcessors()
numHttpServerThreads=

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Number of IO threads in Pulsar Client used in WebSocket proxy"
)
private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors();

@FieldContext(category = CATEGORY_WEBSOCKET,
doc = "Number of threads used by Websocket service")
private int webSocketNumServiceThreads = 20;

@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Number of connections per Broker in Pulsar Client used in WebSocket proxy"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
Expand Down Expand Up @@ -59,11 +58,7 @@ public class WebSocketService implements Closeable {
AuthorizationService authorizationService;
PulsarClient pulsarClient;

private final ScheduledExecutorService executor = Executors
.newScheduledThreadPool(WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS,
new DefaultThreadFactory("pulsar-websocket"));
private final OrderedScheduler orderedExecutor = OrderedScheduler.newSchedulerBuilder()
.numThreads(WebSocketProxyConfiguration.GLOBAL_ZK_THREADS).name("pulsar-websocket-ordered").build();
private final ScheduledExecutorService executor;
private PulsarResources pulsarResources;
private MetadataStoreExtended configMetadataStore;
private ServiceConfiguration config;
Expand All @@ -80,6 +75,9 @@ public WebSocketService(WebSocketProxyConfiguration config) {

public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
this.config = config;
this.executor = Executors
.newScheduledThreadPool(config.getWebSocketNumServiceThreads(),
new DefaultThreadFactory("pulsar-websocket"));
this.localCluster = localCluster;
this.topicProducerMap =
ConcurrentOpenHashMap.<String,
Expand Down Expand Up @@ -145,7 +143,6 @@ public void close() throws IOException {
}

executor.shutdown();
orderedExecutor.shutdown();
}

public AuthenticationService getAuthenticationService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@
@Getter
@Setter
public class WebSocketProxyConfiguration implements PulsarConfiguration {

// Number of threads used by Proxy server
public static final int PROXY_SERVER_EXECUTOR_THREADS = 2 * Runtime.getRuntime().availableProcessors();
// Number of threads used by Websocket service
public static final int WEBSOCKET_SERVICE_THREADS = 20;
// Number of threads used by Global ZK
public static final int GLOBAL_ZK_THREADS = 8;

@FieldContext(required = true, doc = "Name of the cluster to which this broker belongs to")
private String clusterName;

Expand Down Expand Up @@ -140,6 +132,9 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
@FieldContext(doc = "Number of threads to used in HTTP server")
private int numHttpServerThreads = Math.max(6, Runtime.getRuntime().availableProcessors());

@FieldContext(doc = "Number of threads used by Websocket service")
private int webSocketNumServiceThreads = 20;

@FieldContext(doc = "Number of connections per broker in Pulsar client used in WebSocket proxy")
private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors();

Expand Down

0 comments on commit 189d235

Please sign in to comment.