Skip to content

Commit

Permalink
Add connectionsPerBroker setting to WebSocket proxy (apache#497)
Browse files Browse the repository at this point in the history
  • Loading branch information
nkurihar authored and merlimat committed Jun 21, 2017
1 parent a45ffe2 commit 3f39a61
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 6 deletions.
14 changes: 11 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ webServicePort=8080
# Port to use to server HTTPS request
webServicePortTls=8443

# Enable the WebSocket API service in broker
webSocketServiceEnabled=false

# Hostname or IP address the service binds on, default is 0.0.0.0.
bindAddress=0.0.0.0

Expand Down Expand Up @@ -320,3 +317,14 @@ brokerServicePurgeInactiveFrequencyInSeconds=60

# Name of load manager to use
loadManagerClassName=com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl

### --- WebSocket --- ###

# Enable the WebSocket API service in broker
webSocketServiceEnabled=false

# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=8

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8
11 changes: 11 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,14 @@ keepAliveIntervalSeconds=30

# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
brokerServicePurgeInactiveFrequencyInSeconds=60

### --- WebSocket --- ###

# Enable the WebSocket API service in broker
webSocketServiceEnabled=true

# Number of IO threads in Pulsar Client used in WebSocket proxy
webSocketNumIoThreads=8

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8
6 changes: 6 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ bindAddress=0.0.0.0
# Name of the pulsar cluster to connect to
clusterName=

# Number of IO threads in Pulsar Client used in WebSocket proxy
numIoThreads=8

# Number of connections per Broker in Pulsar Client used in WebSocket proxy
connectionsPerBroker=8

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(dynamic = true)
private boolean preferLaterVersions = false;

/**** --- WebSocket --- ****/
// Number of IO threads in Pulsar Client used in WebSocket proxy
private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors();
// Number of connections per Broker in Pulsar Client used in WebSocket proxy
private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors();

public String getZookeeperServers() {
return zookeeperServers;
}
Expand Down Expand Up @@ -1019,4 +1025,12 @@ public boolean isPreferLaterVersions() {
public void setPreferLaterVersions(boolean preferLaterVersions) {
this.preferLaterVersions = preferLaterVersions;
}

public int getWebSocketNumIoThreads() { return webSocketNumIoThreads; }

public void setWebSocketNumIoThreads(int webSocketNumIoThreads) { this.webSocketNumIoThreads = webSocketNumIoThreads; }

public int getWebSocketConnectionsPerBroker() { return webSocketConnectionsPerBroker; }

public void setWebSocketConnectionsPerBroker(int webSocketConnectionsPerBroker) { this.webSocketConnectionsPerBroker = webSocketConnectionsPerBroker; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ private PulsarClient createClientInstance(ClusterData clusterData) throws IOExce
clientConf.setUseTls(config.isTlsEnabled());
clientConf.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
clientConf.setTlsTrustCertsFilePath(config.getTlsTrustCertsFilePath());
clientConf.setIoThreads(WebSocketProxyConfiguration.PULSAR_CLIENT_IO_THREADS);
clientConf.setIoThreads(config.getWebSocketNumIoThreads());
clientConf.setConnectionsPerBroker(config.getWebSocketConnectionsPerBroker());

if (config.isAuthenticationEnabled()) {
clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
Expand Down Expand Up @@ -223,6 +225,8 @@ private static ServiceConfiguration createServiceConfiguration(WebSocketProxyCon
serviceConfig.setTlsCertificateFilePath(config.getTlsCertificateFilePath());
serviceConfig.setTlsKeyFilePath(config.getTlsKeyFilePath());
serviceConfig.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
serviceConfig.setWebSocketNumIoThreads(config.getNumIoThreads());
serviceConfig.setWebSocketConnectionsPerBroker(config.getConnectionsPerBroker());
return serviceConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
public static final int WEBSOCKET_SERVICE_THREADS = 20;
// Number of threads used by Global ZK
public static final int GLOBAL_ZK_THREADS = 8;
// Number of IO threads in Pulsar Client
public static final int PULSAR_CLIENT_IO_THREADS = Runtime.getRuntime().availableProcessors();

// Name of the cluster to which this broker belongs to
@FieldContext(required = true)
Expand Down Expand Up @@ -69,6 +67,11 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
private String brokerClientAuthenticationPlugin;
private String brokerClientAuthenticationParameters;

// Number of IO threads in Pulsar Client used in WebSocket proxy
private int numIoThreads = Runtime.getRuntime().availableProcessors();
// Number of connections per Broker in Pulsar Client used in WebSocket proxy
private int connectionsPerBroker = Runtime.getRuntime().availableProcessors();

/***** --- TLS --- ****/
// Enable TLS
private boolean tlsEnabled = false;
Expand Down Expand Up @@ -211,6 +214,14 @@ public void setBrokerClientAuthenticationParameters(String brokerClientAuthentic
this.brokerClientAuthenticationParameters = brokerClientAuthenticationParameters;
}

public int getNumIoThreads() { return numIoThreads; }

public void setNumIoThreads(int numIoThreads) { this.numIoThreads = numIoThreads; }

public int getConnectionsPerBroker() { return connectionsPerBroker; }

public void setConnectionsPerBroker(int connectionsPerBroker) { this.connectionsPerBroker = connectionsPerBroker; }

public boolean isTlsEnabled() {
return tlsEnabled;
}
Expand Down

0 comments on commit 3f39a61

Please sign in to comment.