Skip to content

Commit

Permalink
Reindex: Only ask for _version we need it
Browse files Browse the repository at this point in the history
`_reindex` only needs the `_version` if the `dest` has
`"version_type": "external"`. So it shouldn't ask for it unless it does.

`_update_by_query` and `_delete_by_query` always need the `_version`.

Closes elastic#19135
  • Loading branch information
nik9000 committed Jul 29, 2016
1 parent bdebd02 commit 6f24866
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,14 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, P
if (sorts == null || sorts.isEmpty()) {
mainRequest.getSearchRequest().source().sort(fieldSort("_doc"));
}
mainRequest.getSearchRequest().source().version(needsSourceDocumentVersions());
}

/**
* Does this operation need the versions of the source documents?
*/
protected abstract boolean needsSourceDocumentVersions();

protected abstract BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs);

protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public AbstractBulkByScrollRequest(SearchRequest source) {
// Set the defaults which differ from SearchRequest's defaults.
source.scroll(DEFAULT_SCROLL_TIMEOUT);
source.source(new SearchSourceBuilder());
source.source().version(true);
source.source().size(DEFAULT_SCROLL_SIZE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,18 +157,43 @@ public String getScrollId() {
* methods.
*/
public interface Hit {
/**
* The index in which the hit is stored.
*/
String getIndex();
/**
* The type that the hit has.
*/
String getType();
/**
* The document id of the hit.
*/
String getId();
/**
* The version of the match or {@code -1} if the version wasn't requested. The {@code -1} keeps it inline with Elasticsearch's
* internal APIs.
*/
long getVersion();
/**
* The source of the hit. Returns null if the source didn't come back from the search, usually because it source wasn't stored at
* all.
*/
@Nullable BytesReference getSource();
/**
* The document id of the parent of the hit if there is a parent or null if there isn't.
*/
@Nullable String getParent();
/**
* The routing on the hit if there is any or null if there isn't.
*/
@Nullable String getRouting();
/**
* The {@code _timestamp} on the hit if one was stored with the hit or null if one wasn't.
*/
@Nullable Long getTimestamp();
/**
* The {@code _ttl} on the hit if one was set on it or null one wasn't.
*/
@Nullable Long getTTL();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ public AsyncDeleteBySearchAction(BulkByScrollTask task, ESLogger logger, ParentT
super(task, logger, client, threadPool, request, listener, scriptService, clusterState);
}

@Override
protected boolean needsSourceDocumentVersions() {
/*
* We always need the version of the source document so we can report a version conflict if we try to delete it and it has been
* changed.
*/
return true;
}

@Override
protected boolean accept(ScrollableHitSource.Hit doc) {
// Delete-by-query does not require the source to delete a document
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
Expand Down Expand Up @@ -231,6 +232,15 @@ public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ParentTa
super(task, logger, client, threadPool, request, listener, scriptService, clusterState);
}

@Override
protected boolean needsSourceDocumentVersions() {
/*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
* external versioning.
*/
return mainRequest.getDestination().versionType() != VersionType.INTERNAL;
}

@Override
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
if (mainRequest.getRemoteInfo() != null) {
Expand Down Expand Up @@ -284,6 +294,7 @@ protected RequestWrapper<IndexRequest> buildRequest(ScrollableHitSource.Hit doc)
*/
index.versionType(mainRequest.getDestination().versionType());
if (index.versionType() == INTERNAL) {
assert doc.getVersion() == -1 : "fetched version when we didn't have to";
index.version(mainRequest.getDestination().version());
} else {
index.version(doc.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ParentTa
super(task, logger, client, threadPool, request, listener, scriptService, clusterState);
}

@Override
protected boolean needsSourceDocumentVersions() {
/*
* We always need the version of the source document so we can report a version conflict if we try to delete it and it has been
* changed.
*/
return true;
}

@Override
protected BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> buildScriptApplier() {
Script script = mainRequest.getScript();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ private RemoteResponseParsers() {}
String index = (String) a[i++];
String type = (String) a[i++];
String id = (String) a[i++];
long version = (long) a[i++];
return new BasicHit(index, type, id, version);
Long version = (Long) a[i++];
return new BasicHit(index, type, id, version == null ? -1 : version);
});
static {
HIT_PARSER.declareString(constructorArg(), new ParseField("_index"));
HIT_PARSER.declareString(constructorArg(), new ParseField("_type"));
HIT_PARSER.declareString(constructorArg(), new ParseField("_id"));
HIT_PARSER.declareLong(constructorArg(), new ParseField("_version"));
HIT_PARSER.declareLong(optionalConstructorArg(), new ParseField("_version"));
HIT_PARSER.declareObject(BasicHit::setSource, (p, s) -> {
try {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,11 @@ public DummyAbstractAsyncBulkByScrollAction() {
AsyncBulkByScrollActionTests.this.threadPool, testRequest, listener);
}

@Override
protected boolean needsSourceDocumentVersions() {
return randomBoolean();
}

@Override
protected BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs) {
return new BulkRequest();
Expand Down

0 comments on commit 6f24866

Please sign in to comment.