Skip to content

Commit

Permalink
Cleanup some Slowness in ThreadContext (elastic#85073)
Browse files Browse the repository at this point in the history
Some of this shows up in profiling, albeit not too heavily. Still the adjusted spots
were needlessly heavy so I cleaned them up to make reasoning about transport thread
performance a little less noisy.
  • Loading branch information
original-brownbear authored Mar 17, 2022
1 parent a780558 commit 6766e88
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,13 @@ public static String getXOpaqueId() {
}

private static String getSingleValue(String headerName) {
return THREAD_CONTEXT.stream()
.filter(t -> t.getHeader(headerName) != null)
.findFirst()
.map(t -> t.getHeader(headerName))
.orElse("");
for (ThreadContext threadContext : THREAD_CONTEXT) {
final String header = threadContext.getHeader(headerName);
if (header != null) {
return header;
}
}
return "";
}

public static void addWarning(String message, Object... params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,13 @@
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Stream;

import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_COUNT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_WARNING_HEADER_SIZE;
Expand Down Expand Up @@ -111,8 +106,17 @@ public StoredContext stashContext() {
* Otherwise when context is stash, it should be empty.
*/

if (HEADERS_TO_COPY.stream().anyMatch(header -> context.requestHeaders.containsKey(header))) {
Map<String, String> map = headers(context, HEADERS_TO_COPY);
boolean hasHeadersToCopy = false;
if (context.requestHeaders.isEmpty() == false) {
for (String header : HEADERS_TO_COPY) {
if (context.requestHeaders.containsKey(header)) {
hasHeadersToCopy = true;
break;
}
}
}
if (hasHeadersToCopy) {
Map<String, String> map = headers(context);
ThreadContextStruct threadContextStruct = DEFAULT_CONTEXT.putHeaders(map);
threadLocal.set(threadContextStruct);
} else {
Expand All @@ -126,11 +130,12 @@ public StoredContext stashContext() {
};
}

private Map<String, String> headers(ThreadContextStruct context, Set<String> headersToCopy) {
Map<String, String> map = Maps.newMapWithExpectedSize(headersToCopy.size());
for (String header : headersToCopy) {
if (context.requestHeaders.containsKey(header)) {
map.put(header, context.requestHeaders.get(header));
private Map<String, String> headers(ThreadContextStruct context) {
Map<String, String> map = Maps.newMapWithExpectedSize(org.elasticsearch.tasks.Task.HEADERS_TO_COPY.size());
for (String header : org.elasticsearch.tasks.Task.HEADERS_TO_COPY) {
final String value = context.requestHeaders.get(header);
if (value != null) {
map.put(header, value);
}
}
return map;
Expand Down Expand Up @@ -482,13 +487,13 @@ default void restore() {
public static Map<String, String> buildDefaultHeaders(Settings settings) {
Settings headers = DEFAULT_HEADERS_SETTING.get(settings);
if (headers == null) {
return Collections.emptyMap();
return Map.of();
} else {
Map<String, String> defaultHeader = new HashMap<>();
for (String key : headers.names()) {
defaultHeader.put(key, headers.get(key));
}
return Collections.unmodifiableMap(defaultHeader);
return Map.copyOf(defaultHeader);
}
}

Expand Down Expand Up @@ -521,11 +526,7 @@ private ThreadContextStruct(
Map<String, Object> transientHeaders,
boolean isSystemContext
) {
this.requestHeaders = requestHeaders;
this.responseHeaders = responseHeaders;
this.transientHeaders = transientHeaders;
this.isSystemContext = isSystemContext;
this.warningHeadersSize = 0L;
this(requestHeaders, responseHeaders, transientHeaders, isSystemContext, 0L);
}

private ThreadContextStruct(
Expand Down Expand Up @@ -580,15 +581,11 @@ private ThreadContextStruct putResponseHeaders(Map<String, Set<String>> headers)
}
final Map<String, Set<String>> newResponseHeaders = new HashMap<>(this.responseHeaders);
for (Map.Entry<String, Set<String>> entry : headers.entrySet()) {
String key = entry.getKey();
final Set<String> existingValues = newResponseHeaders.get(key);
if (existingValues != null) {
final Set<String> newValues = Stream.concat(entry.getValue().stream(), existingValues.stream())
.collect(LINKED_HASH_SET_COLLECTOR);
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
} else {
newResponseHeaders.put(key, entry.getValue());
}
newResponseHeaders.merge(entry.getKey(), entry.getValue(), (existing, added) -> {
final Set<String> updated = new LinkedHashSet<>(added);
updated.addAll(existing);
return Collections.unmodifiableSet(updated);
});
}
return new ThreadContextStruct(requestHeaders, newResponseHeaders, transientHeaders, isSystemContext);
}
Expand Down Expand Up @@ -635,16 +632,16 @@ private ThreadContextStruct putResponse(

final Map<String, Set<String>> newResponseHeaders;
final Set<String> existingValues = responseHeaders.get(key);
if (existingValues != null && existingValues.contains(uniqueValue.apply(value))) {
return this;
}
newResponseHeaders = new HashMap<>(responseHeaders);
if (existingValues != null) {
if (existingValues.contains(uniqueValue.apply(value))) {
return this;
}
// preserve insertion order
final Set<String> newValues = Stream.concat(existingValues.stream(), Stream.of(value)).collect(LINKED_HASH_SET_COLLECTOR);
newResponseHeaders = new HashMap<>(responseHeaders);
final Set<String> newValues = new LinkedHashSet<>(existingValues);
newValues.add(value);
newResponseHeaders.put(key, Collections.unmodifiableSet(newValues));
} else {
newResponseHeaders = new HashMap<>(responseHeaders);
newResponseHeaders.put(key, Collections.singleton(value));
}

Expand Down Expand Up @@ -787,40 +784,4 @@ public AbstractRunnable unwrap() {
}
}

private static final Collector<String, Set<String>, Set<String>> LINKED_HASH_SET_COLLECTOR = new LinkedHashSetCollector<>();

private static class LinkedHashSetCollector<T> implements Collector<T, Set<T>, Set<T>> {
@Override
public Supplier<Set<T>> supplier() {
return LinkedHashSet::new;
}

@Override
public BiConsumer<Set<T>, T> accumulator() {
return Set::add;
}

@Override
public BinaryOperator<Set<T>> combiner() {
return (left, right) -> {
left.addAll(right);
return left;
};
}

@Override
public Function<Set<T>, Set<T>> finisher() {
return Function.identity();
}

private static final Set<Characteristics> CHARACTERISTICS = Collections.unmodifiableSet(
EnumSet.of(Collector.Characteristics.IDENTITY_FINISH)
);

@Override
public Set<Characteristics> characteristics() {
return CHARACTERISTICS;
}
}

}
7 changes: 4 additions & 3 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
Expand Down Expand Up @@ -74,7 +75,7 @@ public class TaskManager implements ClusterStateApplier {
private static final TimeValue WAIT_FOR_COMPLETION_POLL = timeValueMillis(100);

/** Rest headers that are copied to the task */
private final List<String> taskHeaders;
private final String[] taskHeaders;
private final ThreadPool threadPool;

private final Map<Long, Task> tasks = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
Expand All @@ -97,7 +98,7 @@ public class TaskManager implements ClusterStateApplier {

public TaskManager(Settings settings, ThreadPool threadPool, Set<String> taskHeaders) {
this.threadPool = threadPool;
this.taskHeaders = new ArrayList<>(taskHeaders);
this.taskHeaders = taskHeaders.toArray(Strings.EMPTY_ARRAY);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
}

Expand Down Expand Up @@ -739,6 +740,6 @@ public void cancelTaskAndDescendants(CancellableTask task, String reason, boolea
}

public List<String> getTaskHeaders() {
return taskHeaders;
return List.of(taskHeaders);
}
}

0 comments on commit 6766e88

Please sign in to comment.