Skip to content

Commit

Permalink
add check for non-existent pipelines provided to simulate requests (e…
Browse files Browse the repository at this point in the history
  • Loading branch information
talevy committed May 6, 2016
1 parent d0d2d2b commit c1afcb5
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ static Parsed parseWithPipelineId(String pipelineId, Map<String, Object> config,
throw new IllegalArgumentException("param [pipeline] is null");
}
Pipeline pipeline = pipelineStore.get(pipelineId);
if (pipeline == null) {
throw new IllegalArgumentException("pipeline [" + pipelineId + "] does not exist");
}
List<IngestDocument> ingestDocumentList = parseDocs(config);
return new Parsed(pipeline, ingestDocumentList, verbose);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Map;

import static org.elasticsearch.action.ingest.SimulatePipelineRequest.Fields;
import static org.elasticsearch.action.ingest.SimulatePipelineRequest.SIMULATED_PIPELINE_ID;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData.ID;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData.INDEX;
import static org.elasticsearch.ingest.core.IngestDocument.MetaData.TYPE;
Expand All @@ -55,12 +56,12 @@ public class SimulatePipelineRequestParsingTests extends ESTestCase {
public void init() throws IOException {
TestProcessor processor = new TestProcessor(ingestDocument -> {});
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
ProcessorsRegistry.Builder processorRegistryBuilder = new ProcessorsRegistry.Builder();
processorRegistryBuilder.registerProcessor("mock_processor", ((templateService, registry) -> mock(Processor.Factory.class)));
ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(TestTemplateService.instance());
store = mock(PipelineStore.class);
when(store.get(SimulatePipelineRequest.SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
when(store.get(SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
when(store.getProcessorRegistry()).thenReturn(processorRegistry);
}

Expand Down Expand Up @@ -91,7 +92,7 @@ public void testParseUsingPipelineStore() throws Exception {
expectedDocs.add(expectedDoc);
}

SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parseWithPipelineId(SimulatePipelineRequest.SIMULATED_PIPELINE_ID, requestContent, false, store);
SimulatePipelineRequest.Parsed actualRequest = SimulatePipelineRequest.parseWithPipelineId(SIMULATED_PIPELINE_ID, requestContent, false, store);
assertThat(actualRequest.isVerbose(), equalTo(false));
assertThat(actualRequest.getDocuments().size(), equalTo(numDocs));
Iterator<Map<String, Object>> expectedDocsIterator = expectedDocs.iterator();
Expand All @@ -104,7 +105,7 @@ public void testParseUsingPipelineStore() throws Exception {
assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE)));
}

assertThat(actualRequest.getPipeline().getId(), equalTo(SimulatePipelineRequest.SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getId(), equalTo(SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getDescription(), nullValue());
assertThat(actualRequest.getPipeline().getProcessors().size(), equalTo(1));
}
Expand Down Expand Up @@ -177,8 +178,27 @@ public void testParseWithProvidedPipeline() throws Exception {
assertThat(ingestDocument.getSourceAndMetadata(), equalTo(expectedDocument.get(Fields.SOURCE)));
}

assertThat(actualRequest.getPipeline().getId(), equalTo(SimulatePipelineRequest.SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getId(), equalTo(SIMULATED_PIPELINE_ID));
assertThat(actualRequest.getPipeline().getDescription(), nullValue());
assertThat(actualRequest.getPipeline().getProcessors().size(), equalTo(numProcessors));
}

public void testNullPipelineId() {
Map<String, Object> requestContent = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
Exception e = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parseWithPipelineId(null, requestContent, false, store));
assertThat(e.getMessage(), equalTo("param [pipeline] is null"));
}

public void testNonExistentPipelineId() {
String pipelineId = randomAsciiOfLengthBetween(1, 10);
Map<String, Object> requestContent = new HashMap<>();
List<Map<String, Object>> docs = new ArrayList<>();
requestContent.put(Fields.DOCS, docs);
Exception e = expectThrows(IllegalArgumentException.class,
() -> SimulatePipelineRequest.parseWithPipelineId(pipelineId, requestContent, false, store));
assertThat(e.getMessage(), equalTo("pipeline [" + pipelineId + "] does not exist"));
}
}

0 comments on commit c1afcb5

Please sign in to comment.