Skip to content

Commit

Permalink
Close scan scroll (even if its completed/closed)
Browse files Browse the repository at this point in the history
Consider scroll_id when parsing the response
Forcefully close the scroll even if it was completed

While the behavior remain unchanged, the current approach does not rely
anymore on the particular behavior of scan scroll but rather treats all
scrolls the same

Also add HTTP Delete method with body

relates elastic#537
  • Loading branch information
costin committed Aug 21, 2015
1 parent 574fe6f commit 278972f
Show file tree
Hide file tree
Showing 13 changed files with 140 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void testUpsertParamJsonScript() throws Exception {
properties.put(ConfigurationOptions.ES_WRITE_OPERATION, "upsert");
properties.put(ConfigurationOptions.ES_MAPPING_ID, "number");
properties.put(ConfigurationOptions.ES_INPUT_JSON, "yes");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "counter += param1; anothercounter += param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT, "ctx._source.counter += param1; ctx._source.anothercounter += param2");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_LANG, "groovy");
properties.put(ConfigurationOptions.ES_UPDATE_SCRIPT_PARAMS_JSON, "{ \"param1\":1, \"param2\":2}");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class HiveValueReaderTest {
public void testDateMapping() throws Exception {
ScrollReader reader = new ScrollReader(new HiveValueReader(), mapping("hive-date.json"), false, "_mapping", false);
InputStream stream = getClass().getResourceAsStream("hive-date-source.json");
List<Object[]> read = reader.read(stream);
List<Object[]> read = reader.read(stream).getHits();
assertEquals(1, read.size());
Object[] doc = read.get(0);
Map map = (Map) doc[1];
Expand Down
10 changes: 9 additions & 1 deletion mr/src/main/java/org/elasticsearch/hadoop/rest/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private boolean retryFailedEntries(InputStream content, TrackingBytesArray data)
ObjectReader r = JsonFactory.objectReader(mapper, Map.class);
JsonParser parser = mapper.getJsonFactory().createJsonParser(content);
try {
if (ParsingUtils.seek("items", new JacksonJsonParser(parser)) == null) {
if (ParsingUtils.seek(new JacksonJsonParser(parser), "items") == null) {
// recorded bytes are ack here
stats.bytesAccepted += data.length();
stats.docsAccepted += data.entries();
Expand Down Expand Up @@ -323,6 +323,10 @@ protected Response execute(Method method, String path, ByteSequence buffer) {
return execute(new SimpleRequest(method, null, path, null, buffer), true);
}

protected Response execute(Method method, String path, ByteSequence buffer, boolean checkStatus) {
return execute(new SimpleRequest(method, null, path, null, buffer), checkStatus);
}

protected Response execute(Request request, boolean checkStatus) {
Response response = network.execute(request);

Expand Down Expand Up @@ -372,6 +376,10 @@ public InputStream scroll(String scrollId) {
}
}

public void deleteScroll(String scrollId) {
execute(DELETE, "_search/scroll", new BytesArray(scrollId.getBytes(StringUtils.UTF_8)), false);
}

public boolean exists(String indexOrType) {
return (execute(HEAD, indexOrType, false).hasSucceeded());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.hadoop.rest.stats.Stats;
import org.elasticsearch.hadoop.rest.stats.StatsAware;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.Scroll;
import org.elasticsearch.hadoop.serialization.builder.JdkValueReader;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommand;
import org.elasticsearch.hadoop.serialization.bulk.BulkCommands;
Expand Down Expand Up @@ -404,7 +405,7 @@ public Field getMapping() {
return Field.parseField(client.getMapping(resourceR.mapping()));
}

public List<Object[]> scroll(String scrollId, ScrollReader reader) throws IOException {
public Scroll scroll(String scrollId, ScrollReader reader) throws IOException {
InputStream scroll = client.scroll(scrollId);
try {
return reader.read(scroll);
Expand Down
17 changes: 14 additions & 3 deletions mr/src/main/java/org/elasticsearch/hadoop/rest/ScrollQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.hadoop.rest.stats.Stats;
import org.elasticsearch.hadoop.rest.stats.StatsAware;
import org.elasticsearch.hadoop.serialization.ScrollReader;
import org.elasticsearch.hadoop.serialization.ScrollReader.Scroll;

/**
* Result streaming data from a ElasticSearch query using the scan/scroll. Performs batching underneath to retrieve data in chunks.
Expand All @@ -48,6 +49,8 @@ public class ScrollQuery implements Iterator<Object>, Closeable, StatsAware {

private final Stats stats = new Stats();

private boolean closed = false;

ScrollQuery(RestRepository client, String scrollId, long size, ScrollReader reader) {
this.repository = client;
this.scrollId = scrollId;
Expand All @@ -57,8 +60,14 @@ public class ScrollQuery implements Iterator<Object>, Closeable, StatsAware {

@Override
public void close() {
finished = true;
batch = Collections.emptyList();
if (!closed) {
closed = true;
finished = true;
batch = Collections.emptyList();
// typically the scroll is closed after it is consumed so this will trigger a 404
// however we're closing it either way
repository.getRestClient().deleteScroll(scrollId);
}
}

@Override
Expand All @@ -73,7 +82,9 @@ public boolean hasNext() {
}

try {
batch = repository.scroll(scrollId, reader);
Scroll scroll = repository.scroll(scrollId, reader);
scrollId = scroll.getScrollId();
batch = scroll.getHits();
} catch (IOException ex) {
throw new EsHadoopIllegalStateException("Cannot retrieve scroll [" + scrollId + "]", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.UsernamePasswordCredentials;
import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.httpclient.methods.EntityEnclosingMethod;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.HeadMethod;
Expand Down Expand Up @@ -360,7 +359,7 @@ public Response execute(Request request) throws IOException {

switch (request.method()) {
case DELETE:
http = new DeleteMethod();
http = new HttpDeleteWithBody();
break;
case HEAD:
http = new HeadMethod();
Expand Down Expand Up @@ -400,6 +399,9 @@ public Response execute(Request request) throws IOException {

ByteSequence ba = request.body();
if (ba != null && ba.length() > 0) {
if (!(http instanceof EntityEnclosingMethod)) {
throw new IllegalStateException(String.format("Method %s cannot contain body - implementation bug", request.method().name()));
}
EntityEnclosingMethod entityMethod = (EntityEnclosingMethod) http;
entityMethod.setRequestEntity(new BytesArrayRequestEntity(ba));
entityMethod.setContentChunked(false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.rest.commonshttp;

import org.apache.commons.httpclient.methods.EntityEnclosingMethod;

public class HttpDeleteWithBody extends EntityEnclosingMethod {

public HttpDeleteWithBody() {}

public HttpDeleteWithBody(String uri) {
super(uri);
}

public String getName() {
return "DELETE";
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class ParsingUtils {
* @param parser
* @return token associated with the given path or null if not found
*/
public static Token seek(String path, Parser parser) {
public static Token seek(Parser parser, String path) {
// return current token if no path is given
if (!StringUtils.hasText(path)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,24 @@ public String toString() {
}
}

public static class Scroll {
private final String scrollId;
private final List<Object[]> hits;

private Scroll(String scrollId, List<Object[]> hits) {
this.scrollId = scrollId;
this.hits = hits;
}

public String getScrollId() {
return scrollId;
}

public List<Object[]> getHits() {
return hits;
}
}

private static final Log log = LogFactory.getLog(ScrollReader.class);

private Parser parser;
Expand All @@ -134,6 +152,7 @@ public String toString() {
private final String metadataField;
private final boolean returnRawJson;

private static final String[] SCROLL_ID = new String[] { "_scroll_id" };
private static final String[] HITS = new String[] { "hits" };
private static final String[] ID = new String[] { "_id" };
private static final String[] FIELDS = new String[] { "fields" };
Expand All @@ -149,7 +168,7 @@ public ScrollReader(ValueReader reader, Field rootField, boolean readMetadata, S
this.returnRawJson = returnRawJson;
}

public List<Object[]> read(InputStream content) throws IOException {
public Scroll read(InputStream content) throws IOException {
Assert.notNull(content);

BytesArray copy = null;
Expand All @@ -170,14 +189,20 @@ public List<Object[]> read(InputStream content) throws IOException {
}
}

private List<Object[]> read(BytesArray input) {
private Scroll read(BytesArray input) {
// get scroll_id
Token token = ParsingUtils.seek(parser, SCROLL_ID);
Assert.isTrue(token == Token.VALUE_STRING, "invalid response");
String scrollId = parser.text();


// check hits/total
if (hits() == 0) {
return null;
}

// move to hits/hits
Token token = ParsingUtils.seek(parser, HITS);
token = ParsingUtils.seek(parser, HITS);

// move through the list and for each hit, extract the _id and _source
Assert.isTrue(token == Token.START_ARRAY, "invalid response");
Expand Down Expand Up @@ -284,7 +309,7 @@ private List<Object[]> read(BytesArray input) {
}
}

return results;
return new Scroll(scrollId, results);
}

private Object[] readHit() {
Expand All @@ -299,14 +324,14 @@ private Object[] readHitAsMap() {
Object id = null;

Token t = parser.currentToken();

if (parsingCallback != null) {
parsingCallback.beginDoc();
parsingCallback.beginDoc();
}
// read everything until SOURCE or FIELDS is encountered
if (readMetadata) {
if (parsingCallback != null) {
parsingCallback.beginLeadMetadata();
parsingCallback.beginLeadMetadata();
}

metadata = reader.createMap();
Expand Down Expand Up @@ -334,9 +359,9 @@ private Object[] readHitAsMap() {
}

if (parsingCallback != null) {
parsingCallback.endLeadMetadata();
parsingCallback.endLeadMetadata();
}

Assert.notNull(id, "no id found");
result[0] = id;
}
Expand All @@ -351,15 +376,15 @@ private Object[] readHitAsMap() {
Object data = Collections.emptyMap();

if (t != null) {
if (parsingCallback != null) {
parsingCallback.beginSource();
}
if (parsingCallback != null) {
parsingCallback.beginSource();
}

data = read(t, null);
if (parsingCallback != null) {
parsingCallback.endSource();
}

if (parsingCallback != null) {
parsingCallback.endSource();
}

if (readMetadata) {
reader.addToMap(data, reader.wrapString(metadataField), metadata);
Expand All @@ -375,11 +400,11 @@ private Object[] readHitAsMap() {
result[1] = data;

if (readMetadata) {
if (parsingCallback != null) {
parsingCallback.beginTrailMetadata();
}
if (parsingCallback != null) {
parsingCallback.beginTrailMetadata();
}
}

// in case of additional fields (matched_query), add them to the metadata
while (parser.currentToken() == Token.FIELD_NAME) {
String name = parser.currentName();
Expand All @@ -394,13 +419,13 @@ private Object[] readHitAsMap() {
}

if (readMetadata) {
if (parsingCallback != null) {
parsingCallback.endTrailMetadata();
}
if (parsingCallback != null) {
parsingCallback.endTrailMetadata();
}
}

if (parsingCallback != null) {
parsingCallback.endDoc();
parsingCallback.endDoc();
}

if (trace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testParseItems() throws IOException {
ObjectMapper mapper = new ObjectMapper();
InputStream in = getClass().getResourceAsStream("bulk-error.json");
JsonParser parser = mapper.getJsonFactory().createJsonParser(in);
ParsingUtils.seek("items", new JacksonJsonParser(parser));
ParsingUtils.seek(new JacksonJsonParser(parser), "itemsb");

BackportedObjectReader r = BackportedObjectReader.create(mapper, Map.class);

Expand Down
Loading

0 comments on commit 278972f

Please sign in to comment.