Skip to content

Commit

Permalink
Make it possible to set idle WebSocket session timeout period (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
massakam authored and sijie committed Jul 9, 2018
1 parent cd35677 commit b113e90
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 5 deletions.
2 changes: 2 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ webSocketNumIoThreads=8
# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8

# Time in milliseconds that idle WebSocket session times out
webSocketSessionIdleTimeoutMillis=300000

### --- Metrics --- ###

Expand Down
2 changes: 2 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ webSocketNumIoThreads=8
# Number of connections per Broker in Pulsar Client used in WebSocket proxy
webSocketConnectionsPerBroker=8

# Time in milliseconds that idle WebSocket session times out
webSocketSessionIdleTimeoutMillis=300000

### --- Metrics --- ###

Expand Down
3 changes: 3 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ numIoThreads=8
# Number of connections per Broker in Pulsar Client used in WebSocket proxy
connectionsPerBroker=8

# Time in milliseconds that idle WebSocket session times out
webSocketSessionIdleTimeoutMillis=300000

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int webSocketNumIoThreads = Runtime.getRuntime().availableProcessors();
// Number of connections per Broker in Pulsar Client used in WebSocket proxy
private int webSocketConnectionsPerBroker = Runtime.getRuntime().availableProcessors();
// Time in milliseconds that idle WebSocket session times out
private int webSocketSessionIdleTimeoutMillis = 300000;

/**** --- Metrics --- ****/
// If true, export topic level metrics otherwise namespace level
Expand Down Expand Up @@ -1582,6 +1584,14 @@ public void setPreferLaterVersions(boolean preferLaterVersions) {

public void setWebSocketConnectionsPerBroker(int webSocketConnectionsPerBroker) { this.webSocketConnectionsPerBroker = webSocketConnectionsPerBroker; }

public int getWebSocketSessionIdleTimeoutMillis() {
return webSocketSessionIdleTimeoutMillis;
}

public void setWebSocketSessionIdleTimeoutMillis(int webSocketSessionIdleTimeoutMillis) {
this.webSocketSessionIdleTimeoutMillis = webSocketSessionIdleTimeoutMillis;
}

public boolean exposeTopicLevelMetricsInPrometheus() {
return exposeTopicLevelMetricsInPrometheus;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ public WebSocketConsumerServlet(WebSocketService service) {
@Override
public void configure(WebSocketServletFactory factory) {
factory.getPolicy().setMaxTextMessageSize(WebSocketService.MaxTextFrameSize);

if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) {
factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis());
}
factory.setCreator(
(request, response) -> new ConsumerHandler(service, request.getHttpServletRequest(), response));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public WebSocketProducerServlet(WebSocketService service) {
@Override
public void configure(WebSocketServletFactory factory) {
factory.getPolicy().setMaxTextMessageSize(WebSocketService.MaxTextFrameSize);
if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) {
factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis());
}
factory.setCreator((request, response) -> new ProducerHandler(service, request.getHttpServletRequest(), response));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ public WebSocketReaderServlet(WebSocketService service) {
@Override
public void configure(WebSocketServletFactory factory) {
factory.getPolicy().setMaxTextMessageSize(WebSocketService.MaxTextFrameSize);

if (service.getConfig().getWebSocketSessionIdleTimeoutMillis() > 0) {
factory.getPolicy().setIdleTimeout(service.getConfig().getWebSocketSessionIdleTimeoutMillis());
}
factory.setCreator(
(request, response) -> new ReaderHandler(service, request.getHttpServletRequest(), response));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ public class WebSocketProxyConfiguration implements PulsarConfiguration {
private int numIoThreads = Runtime.getRuntime().availableProcessors();
// Number of connections per Broker in Pulsar Client used in WebSocket proxy
private int connectionsPerBroker = Runtime.getRuntime().availableProcessors();
// Time in milliseconds that idle WebSocket session times out
private int webSocketSessionIdleTimeoutMillis = 300000;

// When this parameter is not empty, unauthenticated users perform as anonymousUserRole
private String anonymousUserRole = null;
Expand Down Expand Up @@ -299,6 +301,14 @@ public void setConnectionsPerBroker(int connectionsPerBroker) {
this.connectionsPerBroker = connectionsPerBroker;
}

public int getWebSocketSessionIdleTimeoutMillis() {
return webSocketSessionIdleTimeoutMillis;
}

public void setWebSocketSessionIdleTimeoutMillis(int webSocketSessionIdleTimeoutMillis) {
this.webSocketSessionIdleTimeoutMillis = webSocketSessionIdleTimeoutMillis;
}

public String getAnonymousUserRole() {
return anonymousUserRole;
}
Expand Down

0 comments on commit b113e90

Please sign in to comment.