Skip to content

Commit

Permalink
Make refresh a replicated action
Browse files Browse the repository at this point in the history
prerequisite to elastic#9421
see also elastic#12600
  • Loading branch information
brwe committed Aug 31, 2015
1 parent 1230cb0 commit d81f426
Show file tree
Hide file tree
Showing 30 changed files with 1,048 additions and 457 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
/**
* Base class for write action responses.
*/
public abstract class ActionWriteResponse extends ActionResponse {
public class ActionWriteResponse extends ActionResponse {

public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.List;

/**
Expand All @@ -42,13 +39,4 @@ public class FlushResponse extends BroadcastResponse {
super(totalShards, successfulShards, failedShards, shardFailures);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,27 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.action.support.broadcast.BroadcastShardRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;

/**
*
*/
class ShardFlushRequest extends BroadcastShardRequest {
public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {

private FlushRequest request = new FlushRequest();

ShardFlushRequest() {
public ShardFlushRequest(FlushRequest request) {
super(request);
this.request = request;
}

ShardFlushRequest(ShardId shardId, FlushRequest request) {
super(shardId, request);
this.request = request;
public ShardFlushRequest() {
}

FlushRequest getRequest() {
return request;
}

@Override
public void readFrom(StreamInput in) throws IOException {
Expand All @@ -53,7 +53,5 @@ public void writeTo(StreamOutput out) throws IOException {
request.writeTo(out);
}

FlushRequest getRequest() {
return request;
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,99 +19,45 @@

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.TransportBroadcastAction;
import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceArray;

/**
* Flush Action.
*/
public class TransportFlushAction extends TransportBroadcastAction<FlushRequest, FlushResponse, ShardFlushRequest, ShardFlushResponse> {

private final IndicesService indicesService;
public class TransportFlushAction extends TransportBroadcastReplicationAction<FlushRequest, FlushResponse, ShardFlushRequest, ActionWriteResponse> {

@Inject
public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
TransportService transportService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, FlushAction.NAME, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
FlushRequest.class, ShardFlushRequest.class, ThreadPool.Names.FLUSH);
this.indicesService = indicesService;
}

@Override
protected FlushResponse newResponse(FlushRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.length(); i++) {
Object shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// a non active shard, ignore
} else if (shardResponse instanceof BroadcastShardOperationFailedException) {
failedShards++;
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
} else {
successfulShards++;
}
}
return new FlushResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures);
}

@Override
protected ShardFlushRequest newShardRequest(int numShards, ShardRouting shard, FlushRequest request) {
return new ShardFlushRequest(shard.shardId(), request);
}

@Override
protected ShardFlushResponse newShardResponse() {
return new ShardFlushResponse();
}

@Override
protected ShardFlushResponse shardOperation(ShardFlushRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
indexShard.flush(request.getRequest());
return new ShardFlushResponse(request.shardId());
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
TransportShardFlushAction replicatedFlushAction) {
super(FlushAction.NAME, FlushRequest.class, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedFlushAction);
}

/**
* The refresh request works against *all* shards.
*/
@Override
protected GroupShardsIterator shards(ClusterState clusterState, FlushRequest request, String[] concreteIndices) {
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true, true);
protected ActionWriteResponse newShardResponse() {
return new ActionWriteResponse();
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state, FlushRequest request) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
protected ShardFlushRequest newShardRequest(FlushRequest request, ShardId shardId) {
return new ShardFlushRequest(request).setShardId(shardId).timeout("0ms");
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, FlushRequest countRequest, String[] concreteIndices) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, concreteIndices);
protected FlushResponse newResponse(int successfulShards, int failedShards, int totalNumCopies, List<ShardOperationFailedException> shardFailures) {
return new FlushResponse(totalNumCopies, successfulShards, failedShards, shardFailures);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

/**
*
*/
public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ActionWriteResponse> {

public static final String NAME = "indices:data/write/flush";

@Inject
public TransportShardFlushAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction,
actionFilters, indexNameExpressionResolver, ShardFlushRequest.class, ShardFlushRequest.class, ThreadPool.Names.FLUSH);
}

@Override
protected ActionWriteResponse newResponseInstance() {
return new ActionWriteResponse();
}

@Override
protected Tuple<ActionWriteResponse, ShardFlushRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) throws Throwable {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId.getIndex()).shardSafe(shardRequest.shardId.id());
indexShard.flush(shardRequest.request.getRequest());
logger.trace("{} flush request executed on primary", indexShard.shardId());
return new Tuple<>(new ActionWriteResponse(), shardRequest.request);
}

@Override
protected void shardOperationOnReplica(ShardId shardId, ShardFlushRequest request) {
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).shardSafe(request.shardId().id());
indexShard.flush(request.getRequest());
logger.trace("{} flush request executed on replica", indexShard.shardId());
}

@Override
protected boolean checkWriteConsistency() {
return false;
}

@Override
protected ShardIterator shards(ClusterState clusterState, InternalRequest request) {
return clusterState.getRoutingTable().indicesRouting().get(request.concreteIndex()).getShards().get(request.request().shardId().getId()).shardsIt();
}

@Override
protected ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
protected ClusterBlockException checkRequestBlock(ClusterState state, InternalRequest request) {
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, new String[]{request.concreteIndex()});
}

@Override
protected boolean shouldExecuteReplication(Settings settings) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;

/**
*/
public class RefreshAction extends Action<RefreshRequest, RefreshResponse, RefreshRequestBuilder> {

public static final RefreshAction INSTANCE = new RefreshAction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
*/
public class RefreshRequest extends BroadcastRequest<RefreshRequest> {


RefreshRequest() {
}

Expand All @@ -48,5 +47,4 @@ public RefreshRequest(ActionRequest originalRequest) {
public RefreshRequest(String... indices) {
super(indices);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,18 @@

import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.List;

/**
* The response of a refresh action.
*
*
*/
public class RefreshResponse extends BroadcastResponse {

RefreshResponse() {

}

RefreshResponse(int totalShards, int successfulShards, int failedShards, List<ShardOperationFailedException> shardFailures) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
Loading

0 comments on commit d81f426

Please sign in to comment.