Skip to content

Commit

Permalink
Allow RerouteService to reroute at lower priority (elastic#44338)
Browse files Browse the repository at this point in the history
Today the `BatchedRerouteService` submits its delayed reroute task at `HIGH`
priority, but in some cases a lower priority would be more appropriate. This
commit adds the facility to submit delayed reroute tasks at different
priorities, such that each submitted reroute task runs at a priority no lower
than the one requested. It does not change the fact that all delayed reroute
tasks are submitted at `HIGH` priority, but at least it makes this explicit.
  • Loading branch information
DaveCTurner authored Jul 15, 2019
1 parent dfa40e6 commit 10eb9d7
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
// assign it again, even if that means putting it back on the node on which it previously failed:
final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
logger.trace("{}, scheduling a reroute", reason);
rerouteService.reroute(reason, ActionListener.wrap(
rerouteService.reroute(reason, Priority.HIGH, ActionListener.wrap(
r -> logger.trace("{}, reroute completed", reason),
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;

import java.util.ArrayList;
Expand Down Expand Up @@ -150,7 +151,7 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
results.success(joinTask);
}
if (nodesChanged) {
rerouteService.reroute("post-join reroute", ActionListener.wrap(
rerouteService.reroute("post-join reroute", Priority.HIGH, ActionListener.wrap(
r -> logger.trace("post-join reroute completed"),
e -> logger.debug("post-join reroute failed", e)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction;

/**
Expand All @@ -49,7 +50,8 @@ public class BatchedRerouteService implements RerouteService {

private final Object mutex = new Object();
@Nullable // null if no reroute is currently pending
private PlainListenableActionFuture<Void> pendingRerouteListeners;
private List<ActionListener<Void>> pendingRerouteListeners;
private Priority pendingTaskPriority = Priority.LANGUID;

/**
* @param reroute Function that computes the updated cluster state after it has been rerouted.
Expand All @@ -63,29 +65,55 @@ public BatchedRerouteService(ClusterService clusterService, BiFunction<ClusterSt
* Initiates a reroute.
*/
@Override
public final void reroute(String reason, ActionListener<Void> listener) {
final PlainListenableActionFuture<Void> currentListeners;
public final void reroute(String reason, Priority priority, ActionListener<Void> listener) {
final List<ActionListener<Void>> currentListeners;
synchronized (mutex) {
if (pendingRerouteListeners != null) {
logger.trace("already has pending reroute, adding [{}] to batch", reason);
pendingRerouteListeners.addListener(listener);
return;
if (priority.sameOrAfter(pendingTaskPriority)) {
logger.trace("already has pending reroute at priority [{}], adding [{}] with priority [{}] to batch",
pendingTaskPriority, reason, priority);
pendingRerouteListeners.add(listener);
return;
} else {
logger.trace("already has pending reroute at priority [{}], promoting batch to [{}] and adding [{}]",
pendingTaskPriority, priority, reason);
currentListeners = new ArrayList<>(1 + pendingRerouteListeners.size());
currentListeners.add(listener);
currentListeners.addAll(pendingRerouteListeners);
pendingRerouteListeners.clear();
pendingRerouteListeners = currentListeners;
pendingTaskPriority = priority;
}
} else {
logger.trace("no pending reroute, scheduling reroute [{}] at priority [{}]", reason, priority);
currentListeners = new ArrayList<>(1);
currentListeners.add(listener);
pendingRerouteListeners = currentListeners;
pendingTaskPriority = priority;
}
currentListeners = PlainListenableActionFuture.newListenableFuture();
currentListeners.addListener(listener);
pendingRerouteListeners = currentListeners;
}
logger.trace("rerouting [{}]", reason);
try {
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
new ClusterStateUpdateTask(Priority.HIGH) {
new ClusterStateUpdateTask(priority) {

@Override
public ClusterState execute(ClusterState currentState) {
final boolean currentListenersArePending;
synchronized (mutex) {
assert pendingRerouteListeners == currentListeners;
pendingRerouteListeners = null;
assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners)
: "currentListeners=" + currentListeners + ", pendingRerouteListeners=" + pendingRerouteListeners;
currentListenersArePending = pendingRerouteListeners == currentListeners;
if (currentListenersArePending) {
pendingRerouteListeners = null;
}
}
if (currentListenersArePending) {
logger.trace("performing batched reroute [{}]", reason);
return reroute.apply(currentState, reason);
} else {
logger.trace("batched reroute [{}] was promoted", reason);
return currentState;
}
return reroute.apply(currentState, reason);
}

@Override
Expand All @@ -95,7 +123,7 @@ public void onNoLongerMaster(String source) {
pendingRerouteListeners = null;
}
}
currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled"));
ActionListener.onFailure(currentListeners, new NotMasterException("delayed reroute [" + reason + "] cancelled"));
// no big deal, the new master will reroute again
}

Expand All @@ -114,22 +142,26 @@ public void onFailure(String source, Exception e) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
source, state.version()), e);
}
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
ActionListener.onFailure(currentListeners,
new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
currentListeners.onResponse(null);
ActionListener.onResponse(currentListeners, null);
}
});
} catch (Exception e) {
synchronized (mutex) {
assert pendingRerouteListeners == currentListeners;
pendingRerouteListeners = null;
assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners);
if (pendingRerouteListeners == currentListeners) {
pendingRerouteListeners = null;
}
}
ClusterState state = clusterService.state();
logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
ActionListener.onFailure(currentListeners,
new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@
package org.elasticsearch.cluster.routing;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Priority;

/**
* Asynchronously performs a cluster reroute, updating any shard states and rebalancing the cluster if appropriate.
*/
@FunctionalInterface
public interface RerouteService {
void reroute(String reason, ActionListener<Void> listener);

/**
* Schedule a cluster reroute.
* @param priority the (minimum) priority at which to run this reroute. If there is already a pending reroute at a higher priority then
* this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then
* the priority of the pending batch is raised to the given priority.
*/
void reroute(String reason, Priority priority, ActionListener<Void> listener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -185,7 +186,7 @@ public void onNewInfo(ClusterInfo info) {

if (reroute) {
logger.info("rerouting shards: [{}]", explanation);
rerouteService.reroute("disk threshold monitor", ActionListener.wrap(r -> {
rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/elasticsearch/common/Priority.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,18 @@ public static Priority fromByte(byte b) {
this.value = value;
}

/**
* @return whether tasks of {@code this} priority will run after those of priority {@code p}.
* For instance, {@code Priority.URGENT.after(Priority.IMMEDIATE)} returns {@code true}.
*/
public boolean after(Priority p) {
return this.compareTo(p) > 0;
}

/**
* @return whether tasks of {@code this} priority will run no earlier than those of priority {@code p}.
* For instance, {@code Priority.URGENT.sameOrAfter(Priority.IMMEDIATE)} returns {@code true}.
*/
public boolean sameOrAfter(Priority p) {
return this.compareTo(p) >= 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -106,6 +107,8 @@ public void applyFailedShards(final RoutingAllocation allocation, final List<Fai
}

public void allocateUnassigned(final RoutingAllocation allocation) {
assert primaryShardAllocator != null;
assert replicaShardAllocator != null;
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator);
}

Expand All @@ -127,8 +130,10 @@ protected static void innerAllocatedUnassigned(RoutingAllocation allocation,
*/
public AllocateUnassignedDecision decideUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) {
if (unassignedShard.primary()) {
assert primaryShardAllocator != null;
return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
} else {
assert replicaShardAllocator != null;
return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
}
}
Expand All @@ -142,7 +147,8 @@ class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T>
@Override
protected void reroute(ShardId shardId, String reason) {
logger.trace("{} scheduling reroute for {}", shardId, reason);
rerouteService.reroute("async_shard_fetch", ActionListener.wrap(
assert rerouteService != null;
rerouteService.reroute("async_shard_fetch", Priority.HIGH, ActionListener.wrap(
r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason),
e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testJoinDeduplication() {
x -> localNode, null, Collections.emptySet());
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, r) -> {});
Collections.emptyList(), (s, p, r) -> {});
transportService.start();

DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
Expand Down Expand Up @@ -153,7 +153,7 @@ public void testJoinValidationRejectsMismatchedClusterUUID() {
x -> localNode, null, Collections.emptySet());
new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, r) -> {}); // registers request handler
Collections.emptyList(), (s, p, r) -> {}); // registers request handler
transportService.start();
transportService.acceptIncomingRequests();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ transportService, writableRegistry(),
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
Collections.emptyList(),
random, (s, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE);
random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE);
transportService.start();
transportService.acceptIncomingRequests();
transport = capturingTransport;
Expand Down
Loading

0 comments on commit 10eb9d7

Please sign in to comment.