Skip to content

Commit

Permalink
Expose underlying processor to blame for thrown exception within Comp…
Browse files Browse the repository at this point in the history
…oundProcessor (elastic#18342)

Fixes elastic#17823
  • Loading branch information
talevy committed May 24, 2016
1 parent 84dfa36 commit 0fa67e1
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@

package org.elasticsearch.ingest.core;

import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.ElasticsearchException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -94,30 +93,38 @@ public void execute(IngestDocument ingestDocument) throws Exception {
try {
processor.execute(ingestDocument);
} catch (Exception e) {
ElasticsearchException compoundProcessorException = newCompoundProcessorException(e, processor.getType(), processor.getTag());
if (onFailureProcessors.isEmpty()) {
throw e;
throw compoundProcessorException;
} else {
executeOnFailure(ingestDocument, e, processor.getType(), processor.getTag());
executeOnFailure(ingestDocument, compoundProcessorException);
}
break;
}
}
}

void executeOnFailure(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) throws Exception {
void executeOnFailure(IngestDocument ingestDocument, ElasticsearchException exception) throws Exception {
try {
putFailureMetadata(ingestDocument, cause, failedProcessorType, failedProcessorTag);
putFailureMetadata(ingestDocument, exception);
for (Processor processor : onFailureProcessors) {
processor.execute(ingestDocument);
try {
processor.execute(ingestDocument);
} catch (Exception e) {
throw newCompoundProcessorException(e, processor.getType(), processor.getTag());
}
}
} finally {
removeFailureMetadata(ingestDocument);
}
}

private void putFailureMetadata(IngestDocument ingestDocument, Exception cause, String failedProcessorType, String failedProcessorTag) {
private void putFailureMetadata(IngestDocument ingestDocument, ElasticsearchException cause) {
List<String> processorTypeHeader = cause.getHeader("processor_type");
List<String> processorTagHeader = cause.getHeader("processor_tag");
String failedProcessorType = (processorTypeHeader != null) ? processorTypeHeader.get(0) : null;
String failedProcessorTag = (processorTagHeader != null) ? processorTagHeader.get(0) : null;
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getMessage());
ingestMetadata.put(ON_FAILURE_MESSAGE_FIELD, cause.getRootCause().getMessage());
ingestMetadata.put(ON_FAILURE_PROCESSOR_TYPE_FIELD, failedProcessorType);
ingestMetadata.put(ON_FAILURE_PROCESSOR_TAG_FIELD, failedProcessorTag);
}
Expand All @@ -128,4 +135,21 @@ private void removeFailureMetadata(IngestDocument ingestDocument) {
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TYPE_FIELD);
ingestMetadata.remove(ON_FAILURE_PROCESSOR_TAG_FIELD);
}

private ElasticsearchException newCompoundProcessorException(Exception e, String processorType, String processorTag) {
if (e instanceof ElasticsearchException && ((ElasticsearchException)e).getHeader("processor_type") != null) {
return (ElasticsearchException) e;
}

ElasticsearchException exception = new ElasticsearchException(new IllegalArgumentException(e));

if (processorType != null) {
exception.addHeader("processor_type", processorType);
}
if (processorTag != null) {
exception.addHeader("processor_tag", processorTag);
}

return exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.RandomDocumentPicks;
import org.elasticsearch.ingest.TestProcessor;
Expand Down Expand Up @@ -167,7 +168,8 @@ public void testExecuteItemWithFailure() throws Exception {
SimulateDocumentBaseResult simulateDocumentBaseResult = (SimulateDocumentBaseResult) actualItemResponse;
assertThat(simulateDocumentBaseResult.getIngestDocument(), nullValue());
assertThat(simulateDocumentBaseResult.getFailure(), instanceOf(RuntimeException.class));
RuntimeException runtimeException = (RuntimeException) simulateDocumentBaseResult.getFailure();
assertThat(runtimeException.getMessage(), equalTo("processor failed"));
Exception exception = simulateDocumentBaseResult.getFailure();
assertThat(exception, instanceOf(ElasticsearchException.class));
assertThat(exception.getMessage(), equalTo("java.lang.IllegalArgumentException: java.lang.RuntimeException: processor failed"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.bulk.BulkItemResponse;
Expand Down Expand Up @@ -154,7 +155,8 @@ public void testBulkWithIngestFailures() throws Exception {
BulkItemResponse itemResponse = response.getItems()[i];
if (i % 2 == 0) {
BulkItemResponse.Failure failure = itemResponse.getFailure();
assertThat(failure.getMessage(), equalTo("java.lang.IllegalArgumentException: test processor failed"));
ElasticsearchException compoundProcessorException = (ElasticsearchException) failure.getCause();
assertThat(compoundProcessorException.getRootCause().getMessage(), equalTo("test processor failed"));
} else {
IndexResponse indexResponse = itemResponse.getResponse();
assertThat("Expected a successful response but found failure [" + itemResponse.getFailure() + "].",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -188,6 +189,8 @@ public void testExecuteFailure() throws Exception {

public void testExecuteSuccessWithOnFailure() throws Exception {
Processor processor = mock(Processor.class);
when(processor.getType()).thenReturn("mock_processor_type");
when(processor.getTag()).thenReturn("mock_processor_tag");
Processor onFailureProcessor = mock(Processor.class);
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(processor), Collections.singletonList(new CompoundProcessor(onFailureProcessor)));
when(store.get("_id")).thenReturn(new Pipeline("_id", "_description", compoundProcessor));
Expand All @@ -198,7 +201,7 @@ public void testExecuteSuccessWithOnFailure() throws Exception {
@SuppressWarnings("unchecked")
Consumer<Boolean> completionHandler = mock(Consumer.class);
executionService.executeIndexRequest(indexRequest, failureHandler, completionHandler);
verify(failureHandler, never()).accept(any(RuntimeException.class));
verify(failureHandler, never()).accept(any(ElasticsearchException.class));
verify(completionHandler, times(1)).accept(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,17 @@

package org.elasticsearch.ingest.core;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.ingest.processor.AppendProcessor;
import org.elasticsearch.ingest.processor.SetProcessor;
import org.elasticsearch.ingest.processor.SplitProcessor;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;

public class CompoundProcessorTests extends ESTestCase {
Expand Down Expand Up @@ -70,8 +66,8 @@ public void testSingleProcessorWithException() throws Exception {
try {
compoundProcessor.execute(ingestDocument);
fail("should throw exception");
} catch (Exception e) {
assertThat(e.getMessage(), equalTo("error"));
} catch (ElasticsearchException e) {
assertThat(e.getRootCause().getMessage(), equalTo("error"));
}
assertThat(processor.getInvokedCounter(), equalTo(1));
}
Expand Down Expand Up @@ -117,4 +113,68 @@ public void testSingleProcessorWithNestedFailures() throws Exception {
assertThat(processorToFail.getInvokedCounter(), equalTo(1));
assertThat(lastProcessor.getInvokedCounter(), equalTo(1));
}

public void testCompoundProcessorExceptionFailWithoutOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("error"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("first"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("id1"));
});

CompoundProcessor failCompoundProcessor = new CompoundProcessor(firstProcessor);

CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument);

assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
}

public void testCompoundProcessorExceptionFail() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
});

CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor),
Collections.singletonList(failProcessor));

CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument);

assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
}

public void testCompoundProcessorExceptionFailInOnFailure() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
TestProcessor failProcessor = new TestProcessor("tag_fail", "fail", ingestDocument -> {throw new RuntimeException("custom error message");});
TestProcessor secondProcessor = new TestProcessor("id3", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.entrySet(), hasSize(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
});

CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor),
Collections.singletonList(new CompoundProcessor(failProcessor)));

CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument);

assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest.processor;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ingest.SimulateProcessorResult;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.core.CompoundProcessor;
Expand Down Expand Up @@ -73,8 +74,9 @@ public void testActualCompoundProcessorWithoutOnFailure() throws Exception {

try {
trackingProcessor.execute(ingestDocument);
} catch (Exception e) {
assertThat(e.getMessage(), equalTo(exception.getMessage()));
fail("processor should throw exception");
} catch (ElasticsearchException e) {
assertThat(e.getRootCause().getMessage(), equalTo(exception.getMessage()));
}

SimulateProcessorResult expectedFirstResult = new SimulateProcessorResult(testProcessor.getTag(), ingestDocument);
Expand Down Expand Up @@ -121,8 +123,8 @@ public void testActualCompoundProcessorWithOnFailure() throws Exception {

metadata = resultList.get(3).getIngestDocument().getIngestMetadata();
assertThat(metadata.get(ON_FAILURE_MESSAGE_FIELD), equalTo("fail"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("compound"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("CompoundProcessor-fail-success-success-fail"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("test"));
assertThat(metadata.get(ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("fail"));
assertThat(resultList.get(3).getFailure(), nullValue());
assertThat(resultList.get(3).getProcessorTag(), equalTo(expectedSuccessResult.getProcessorTag()));
}
Expand Down

0 comments on commit 0fa67e1

Please sign in to comment.