Skip to content

Commit

Permalink
[Broker][Proxy][Function worker] Fix backpressure handling in Jetty w…
Browse files Browse the repository at this point in the history
…eb server configuration (apache#14353)

* [Broker] Improve Jetty configuration to handle backpressure

- Fix maxConcurrentHttpRequests by using QoSFilter to limit concurrent requests
  - previous solution didn't limit concurrent http requests
- Replace previous hardcoded connection limit of 10000 http connections with configurable setting
  - use Jetty's built-in connection limit instead of PulsarServerConnector's custom solution
- Rate limiting should happen in the beginning of the filter chain
- Let Jetty tune selectors and acceptors based on number of cores
  - JETTY_AVAILABLE_PROCESSORS=n environment variable can be used to override the number of cores reported by the OS
    - This is useful when CPU limit isn't set on k8s and the number of cores is the number of total cores available on the k8s node
    - use can also use -XX:ActiveProcessorCount=n to make Java's Runtime.getRuntime().availableProcessors() return n
- Make accept queue capacity configurable
- Make thread pool queue capacity bounded and make it configurable

* [Functions] Add http backpressure handling for Functions worker's http server
  • Loading branch information
lhotari authored Apr 12, 2022
1 parent 8ff87ac commit ec38211
Show file tree
Hide file tree
Showing 16 changed files with 307 additions and 147 deletions.
15 changes: 12 additions & 3 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ numCacheExecutorThreadPoolSize=10
# reduce the number of IO threads and BK client threads to only have few CPU cores busy.
enableBusyWait=false

# Max concurrent web requests
maxConcurrentHttpRequests=1024

# Flag to control features that are meant to be used when running in standalone mode
isRunningStandalone=

Expand Down Expand Up @@ -783,6 +780,18 @@ httpRequestsLimitEnabled=false
# Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests)
httpRequestsMaxPerSecond=100.0

# Capacity for thread pool queue in the HTTP server
httpServerThreadPoolQueueSize=8192

# Capacity for accept queue in the HTTP server
httpServerAcceptQueueSize=8192

# Maximum number of inbound http connections. (0 to disable limiting)
maxHttpServerConnections=2048

# Max concurrent web requests
maxConcurrentHttpRequests=1024

### --- BookKeeper Client --- ###

# Metadata service uri that bookkeeper is used for loading corresponding metadata driver
Expand Down
11 changes: 11 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,17 @@ httpRequestsLimitEnabled=false
# Max HTTP requests per seconds allowed. The excess of requests will be rejected with HTTP code 429 (Too many requests)
httpRequestsMaxPerSecond=100.0

# Capacity for thread pool queue in the HTTP server
httpServerThreadPoolQueueSize=8192

# Capacity for accept queue in the HTTP server
httpServerAcceptQueueSize=8192

# Maximum number of inbound http connections. (0 to disable limiting)
maxHttpServerConnections=2048

# Max concurrent web requests
maxConcurrentHttpRequests=1024

### --- Token Authentication Provider --- ###

Expand Down
12 changes: 12 additions & 0 deletions conf/websocket.conf
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ webSocketSessionIdleTimeoutMillis=300000
# The maximum size of a text message during parsing in WebSocket proxy
webSocketMaxTextFrameSize=1048576

# Capacity for thread pool queue in the HTTP server
httpServerThreadPoolQueueSize=8192

# Capacity for accept queue in the HTTP server
httpServerAcceptQueueSize=8192

# Maximum number of inbound http connections. (0 to disable limiting)
maxHttpServerConnections=2048

# Max concurrent web requests
maxConcurrentHttpRequests=1024

### --- Authentication --- ###

# Enable authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,24 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(category = CATEGORY_SERVER, doc = "Max concurrent web requests")
private int maxConcurrentHttpRequests = 1024;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Capacity for thread pool queue in the HTTP server"
+ " Default is set to 8192."
)
private int httpServerThreadPoolQueueSize = 8192;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Capacity for accept queue in the HTTP server"
+ " Default is set to 8192."
)
private int httpServerAcceptQueueSize = 8192;

@FieldContext(category = CATEGORY_SERVER, doc = "Maximum number of inbound http connections. "
+ "(0 to disable limiting)")
private int maxHttpServerConnections = 2048;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the delayed delivery for messages.")
private boolean delayedDeliveryEnabled = true;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;

import java.util.EnumSet;
import java.util.Map;
import javax.servlet.DispatcherType;
import javax.servlet.Filter;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;

public class Filters {
private static final String MATCH_ALL = "/*";

/**
* Adds a filter instance to the servlet context handler.
* The filter will be used for all requests.
*
* @param context servlet context handler instance
* @param filter filter instance
*/
public static void addFilter(ServletContextHandler context, Filter filter) {
addFilterHolder(context, new FilterHolder(filter));
}

private static void addFilterHolder(ServletContextHandler context, FilterHolder filter) {
context.addFilter(filter,
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}

/**
* Adds a filter to the servlet context handler which gets instantiated and configured when the server starts.
*
* @param context servlet context handler instance
* @param filter filter class
* @param initParams initialization parameters used for configuring the filter instance
*/
public static void addFilterClass(ServletContextHandler context, Class<? extends Filter> filter,
Map<String, String> initParams) {
FilterHolder holder = new FilterHolder(filter);
holder.setInitParameters(initParams);
addFilterHolder(context, holder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.ThreadFactory;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;

public class WebExecutorThreadPool extends ExecutorThreadPool {

private final ThreadFactory threadFactory;

public WebExecutorThreadPool(String namePrefix) {
this(Runtime.getRuntime().availableProcessors(), namePrefix);
public WebExecutorThreadPool(int maxThreads, String namePrefix) {
this(maxThreads, namePrefix, 8192);
}

public WebExecutorThreadPool(int maxThreads, String namePrefix) {
super(maxThreads);
public WebExecutorThreadPool(int maxThreads, String namePrefix, int queueCapacity) {
super(maxThreads, Math.min(8, maxThreads), new BlockingArrayQueue<>(queueCapacity, queueCapacity));
this.threadFactory = new DefaultThreadFactory(namePrefix);
}

Expand Down

This file was deleted.

Loading

0 comments on commit ec38211

Please sign in to comment.