Skip to content

Commit

Permalink
Fix update_by_query's default size parameter (elastic#26784)
Browse files Browse the repository at this point in the history
We were accidentally defaulting it to the scroll size.
Untwists some of the tricks that we play with parsing
so that the size is no longer scrambled.

Closes elastic#26761
  • Loading branch information
nik9000 authored Sep 25, 2017
1 parent b8cd82e commit eb754a7
Show file tree
Hide file tree
Showing 9 changed files with 206 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr

public static final int SIZE_ALL_MATCHES = -1;
private static final TimeValue DEFAULT_SCROLL_TIMEOUT = timeValueMinutes(5);
private static final int DEFAULT_SCROLL_SIZE = 1000;
static final int DEFAULT_SCROLL_SIZE = 1000;

public static final int AUTO_SLICES = 0;
public static final String AUTO_SLICES_VALUE = "auto";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.function.IntConsumer;

import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.rest.RestRequest.Method.GET;
Expand Down Expand Up @@ -73,8 +74,21 @@ public String getName() {
@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
SearchRequest searchRequest = new SearchRequest();
/*
* We have to pull out the call to `source().size(size)` because
* _update_by_query and _delete_by_query uses this same parsing
* path but sets a different variable when it sees the `size`
* url parameter.
*
* Note that we can't use `searchRequest.source()::size` because
* `searchRequest.source()` is null right now. We don't have to
* guard against it being null in the IntConsumer because it can't
* be null later. If that is confusing to you then you are in good
* company.
*/
IntConsumer setSize = size -> searchRequest.source().size(size);
request.withContentOrSourceParamParserOrNull(parser ->
parseSearchRequest(searchRequest, request, parser));
parseSearchRequest(searchRequest, request, parser, setSize));

return channel -> client.search(searchRequest, new RestStatusToXContentListener<>(channel));
}
Expand All @@ -84,9 +98,11 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
*
* @param requestContentParser body of the request to read. This method does not attempt to read the body from the {@code request}
* parameter
* @param setSize how the size url parameter is handled. {@code udpate_by_query} and regular search differ here.
*/
public static void parseSearchRequest(SearchRequest searchRequest, RestRequest request,
XContentParser requestContentParser) throws IOException {
XContentParser requestContentParser,
IntConsumer setSize) throws IOException {

if (searchRequest.source() == null) {
searchRequest.source(new SearchSourceBuilder());
Expand Down Expand Up @@ -118,7 +134,7 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
} else {
searchRequest.searchType(searchType);
}
parseSearchSource(searchRequest.source(), request);
parseSearchSource(searchRequest.source(), request, setSize);
searchRequest.requestCache(request.paramAsBoolean("request_cache", null));

String scroll = request.param("scroll");
Expand All @@ -136,7 +152,7 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r
* Parses the rest request on top of the SearchSourceBuilder, preserving
* values that are not overridden by the rest request.
*/
private static void parseSearchSource(final SearchSourceBuilder searchSourceBuilder, RestRequest request) {
private static void parseSearchSource(final SearchSourceBuilder searchSourceBuilder, RestRequest request, IntConsumer setSize) {
QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);
if (queryBuilder != null) {
searchSourceBuilder.query(queryBuilder);
Expand All @@ -148,7 +164,7 @@ private static void parseSearchSource(final SearchSourceBuilder searchSourceBuil
}
int size = request.paramAsInt("size", -1);
if (size != -1) {
searchSourceBuilder.size(size);
setSize.accept(size);
}

if (request.hasParam("explain")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.script.mustache;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.ParseField;
Expand Down Expand Up @@ -94,7 +93,7 @@ public String getName() {
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
// Creates the search request with all required params
SearchRequest searchRequest = new SearchRequest();
RestSearchAction.parseSearchRequest(searchRequest, request, null);
RestSearchAction.parseSearchRequest(searchRequest, request, null, size -> searchRequest.source().size(size));

// Creates the search template request
SearchTemplateRequest searchTemplateRequest;
Expand Down
4 changes: 2 additions & 2 deletions modules/reindex/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ esplugin {
}

integTestCluster {
// Whitelist reindexing from the local node so we can test it.
// Whitelist reindexing from the local node so we can test reindex-from-remote.
setting 'reindex.remote.whitelist', '127.0.0.1:*'
}

run {
// Whitelist reindexing from the local node so we can test it.
// Whitelist reindexing from the local node so we can test reindex-from-remote.
setting 'reindex.remote.whitelist', '127.0.0.1:*'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,12 @@ protected void parseInternalRequest(Request internal, RestRequest restRequest,
assert restRequest != null : "RestRequest should not be null";

SearchRequest searchRequest = internal.getSearchRequest();
int scrollSize = searchRequest.source().size();

try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) {
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser);
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, internal::setSize);
}

internal.setSize(searchRequest.source().size());
searchRequest.source().size(restRequest.paramAsInt("scroll_size", scrollSize));
searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size()));

String conflicts = restRequest.param("conflicts");
if (conflicts != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.reindex;

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.junit.Before;

import java.io.IOException;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.hasEntry;

/**
* Tests {@code _update_by_query}, {@code _delete_by_query}, and {@code _reindex}
* of many documents over REST. It is important to test many documents to make
* sure that we don't change the default behavior of touching <strong>all</strong>
* documents in the request.
*/
public class ManyDocumentsIT extends ESRestTestCase {
private final int count = between(150, 2000);

@Before
public void setupTestIndex() throws IOException {
StringBuilder bulk = new StringBuilder();
for (int i = 0; i < count; i++) {
bulk.append("{\"index\":{}}\n");
bulk.append("{\"test\":\"test\"}\n");
}
client().performRequest("POST", "/test/test/_bulk", singletonMap("refresh", "true"),
new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
}

public void testReindex() throws IOException {
Map<String, Object> response = toMap(client().performRequest("POST", "/_reindex", emptyMap(), new StringEntity(
"{\"source\":{\"index\":\"test\"}, \"dest\":{\"index\":\"des\"}}",
ContentType.APPLICATION_JSON)));
assertThat(response, hasEntry("total", count));
assertThat(response, hasEntry("created", count));
}

public void testReindexFromRemote() throws IOException {
Map<?, ?> nodesInfo = toMap(client().performRequest("GET", "/_nodes/http"));
nodesInfo = (Map<?, ?>) nodesInfo.get("nodes");
Map<?, ?> nodeInfo = (Map<?, ?>) nodesInfo.values().iterator().next();
Map<?, ?> http = (Map<?, ?>) nodeInfo.get("http");
String remote = "http://"+ http.get("publish_address");
Map<String, Object> response = toMap(client().performRequest("POST", "/_reindex", emptyMap(), new StringEntity(
"{\"source\":{\"index\":\"test\",\"remote\":{\"host\":\"" + remote + "\"}}, \"dest\":{\"index\":\"des\"}}",
ContentType.APPLICATION_JSON)));
assertThat(response, hasEntry("total", count));
assertThat(response, hasEntry("created", count));
}


public void testUpdateByQuery() throws IOException {
Map<String, Object> response = toMap(client().performRequest("POST", "/test/_update_by_query"));
assertThat(response, hasEntry("total", count));
assertThat(response, hasEntry("updated", count));
}

public void testDeleteByQuery() throws IOException {
Map<String, Object> response = toMap(client().performRequest("POST", "/test/_delete_by_query", emptyMap(), new StringEntity(
"{\"query\":{\"match_all\":{}}}",
ContentType.APPLICATION_JSON)));
assertThat(response, hasEntry("total", count));
assertThat(response, hasEntry("deleted", count));
}

static Map<String, Object> toMap(Response response) throws IOException {
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response.getEntity().getContent(), false);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.reindex;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;

import java.io.IOException;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.mock;

public class RestDeleteByQueryActionTests extends ESTestCase {
public void testParseEmpty() throws IOException {
RestDeleteByQueryAction action = new RestDeleteByQueryAction(Settings.EMPTY, mock(RestController.class));
DeleteByQueryRequest request = action.buildRequest(new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList()))
.build());
assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize());
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.reindex;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;

import java.io.IOException;

import static java.util.Collections.emptyList;
import static org.mockito.Mockito.mock;

public class RestUpdateByQueryActionTests extends ESTestCase {
public void testParseEmpty() throws IOException {
RestUpdateByQueryAction action = new RestUpdateByQueryAction(Settings.EMPTY, mock(RestController.class));
UpdateByQueryRequest request = action.buildRequest(new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList()))
.build());
assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize());
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size());
}
}

0 comments on commit eb754a7

Please sign in to comment.