Skip to content

Commit

Permalink
new ScriptProcessor for Ingest (elastic#18193)
Browse files Browse the repository at this point in the history
add new ScriptProcessor for executing ES Scripts within pipelines
  • Loading branch information
talevy authored Jun 15, 2016
1 parent 154d750 commit a26260f
Show file tree
Hide file tree
Showing 29 changed files with 559 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.ingest;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.core.IngestInfo;
import org.elasticsearch.ingest.core.Processor;
Expand Down Expand Up @@ -55,8 +56,8 @@ public PipelineExecutionService getPipelineExecutionService() {
return pipelineExecutionService;
}

public void setScriptService(ScriptService scriptService) {
pipelineStore.buildProcessorFactoryRegistry(processorsRegistryBuilder, scriptService);
public void buildProcessorsFactoryRegistry(ScriptService scriptService, ClusterService clusterService) {
pipelineStore.buildProcessorFactoryRegistry(processorsRegistryBuilder, scriptService, clusterService);
}

public IngestInfo info() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ public PipelineStore(Settings settings) {
super(settings);
}

public void buildProcessorFactoryRegistry(ProcessorsRegistry.Builder processorsRegistryBuilder, ScriptService scriptService) {
TemplateService templateService = new InternalTemplateService(scriptService);
this.processorRegistry = processorsRegistryBuilder.build(templateService);
public void buildProcessorFactoryRegistry(ProcessorsRegistry.Builder processorsRegistryBuilder, ScriptService scriptService, ClusterService clusterService) {
this.processorRegistry = processorsRegistryBuilder.build(scriptService, clusterService);
}

@Override
Expand Down
41 changes: 30 additions & 11 deletions core/src/main/java/org/elasticsearch/ingest/ProcessorsRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
package org.elasticsearch.ingest;

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.ProcessorInfo;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.script.ScriptService;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -31,21 +32,39 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;

public final class ProcessorsRegistry implements Closeable {

private final Map<String, Processor.Factory> processorFactories;
private final TemplateService templateService;
private final ScriptService scriptService;
private final ClusterService clusterService;

private ProcessorsRegistry(TemplateService templateService,
Map<String, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>>> providers) {
private ProcessorsRegistry(ScriptService scriptService, ClusterService clusterService,
Map<String, Function<ProcessorsRegistry, Processor.Factory<?>>> providers) {
this.templateService = new InternalTemplateService(scriptService);
this.scriptService = scriptService;
this.clusterService = clusterService;
Map<String, Processor.Factory> processorFactories = new HashMap<>();
for (Map.Entry<String, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>>> entry : providers.entrySet()) {
processorFactories.put(entry.getKey(), entry.getValue().apply(templateService, this));
for (Map.Entry<String, Function<ProcessorsRegistry, Processor.Factory<?>>> entry : providers.entrySet()) {
processorFactories.put(entry.getKey(), entry.getValue().apply(this));
}
this.processorFactories = Collections.unmodifiableMap(processorFactories);
}

public TemplateService getTemplateService() {
return templateService;
}

public ScriptService getScriptService() {
return scriptService;
}

public ClusterService getClusterService() {
return clusterService;
}

public Processor.Factory getProcessorFactory(String name) {
return processorFactories.get(name);
}
Expand All @@ -68,20 +87,20 @@ Map<String, Processor.Factory> getProcessorFactories() {

public static final class Builder {

private final Map<String, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>>> providers = new HashMap<>();
private final Map<String, Function<ProcessorsRegistry, Processor.Factory<?>>> providers = new HashMap<>();

/**
* Adds a processor factory under a specific name.
*/
public void registerProcessor(String name, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>> provider) {
BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>> previous = this.providers.putIfAbsent(name, provider);
public void registerProcessor(String name, Function<ProcessorsRegistry, Processor.Factory<?>> provider) {
Function<ProcessorsRegistry, Processor.Factory<?>> previous = this.providers.putIfAbsent(name, provider);
if (previous != null) {
throw new IllegalArgumentException("Processor factory already registered for name [" + name + "]");
}
}

public ProcessorsRegistry build(TemplateService templateService) {
return new ProcessorsRegistry(templateService, providers);
public ProcessorsRegistry build(ScriptService scriptService, ClusterService clusterService) {
return new ProcessorsRegistry(scriptService, clusterService, providers);
}

}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/java/org/elasticsearch/node/NodeModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.TemplateService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.service.NodeService;

import java.util.function.BiFunction;
import java.util.function.Function;

/**
*
Expand Down Expand Up @@ -71,7 +70,7 @@ public Node getNode() {
/**
* Adds a processor factory under a specific type name.
*/
public void registerProcessor(String type, BiFunction<TemplateService, ProcessorsRegistry, Processor.Factory<?>> provider) {
public void registerProcessor(String type, Function<ProcessorsRegistry, Processor.Factory<?>> provider) {
processorsRegistryBuilder.registerProcessor(type, provider);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class NodeService extends AbstractComponent implements Closeable {
private final CircuitBreakerService circuitBreakerService;
private final IngestService ingestService;
private final SettingsFilter settingsFilter;
private ClusterService clusterService;
private ScriptService scriptService;

@Nullable
Expand All @@ -87,6 +88,7 @@ public NodeService(Settings settings, ThreadPool threadPool, MonitorService moni
this.version = version;
this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService;
this.clusterService = clusterService;
this.ingestService = new IngestService(settings, threadPool, processorsRegistryBuilder);
this.settingsFilter = settingsFilter;
clusterService.add(ingestService.getPipelineStore());
Expand All @@ -97,7 +99,7 @@ public NodeService(Settings settings, ThreadPool threadPool, MonitorService moni
@Inject(optional = true)
public void setScriptService(ScriptService scriptService) {
this.scriptService = scriptService;
this.ingestService.setScriptService(scriptService);
this.ingestService.buildProcessorsFactoryRegistry(scriptService, clusterService);
}

public void setHttpServer(@Nullable HttpServer httpServer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.ingest;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.TestProcessor;
Expand All @@ -27,6 +28,7 @@
import org.elasticsearch.ingest.core.IngestDocument;
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -58,8 +60,8 @@ public void init() throws IOException {
CompoundProcessor pipelineCompoundProcessor = new CompoundProcessor(processor);
Pipeline pipeline = new Pipeline(SIMULATED_PIPELINE_ID, null, pipelineCompoundProcessor);
ProcessorsRegistry.Builder processorRegistryBuilder = new ProcessorsRegistry.Builder();
processorRegistryBuilder.registerProcessor("mock_processor", ((templateService, registry) -> mock(Processor.Factory.class)));
ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(TestTemplateService.instance());
processorRegistryBuilder.registerProcessor("mock_processor", ((registry) -> mock(Processor.Factory.class)));
ProcessorsRegistry processorRegistry = processorRegistryBuilder.build(mock(ScriptService.class), mock(ClusterService.class));
store = mock(PipelineStore.class);
when(store.get(SIMULATED_PIPELINE_ID)).thenReturn(pipeline);
when(store.getProcessorRegistry()).thenReturn(processorRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public String description() {
}

public void onModule(NodeModule nodeModule) {
nodeModule.registerProcessor("test", (templateService, registry) -> new Factory());
nodeModule.registerProcessor("test", (registry) -> new Factory());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.LocalTransportAddress;
Expand All @@ -36,6 +37,7 @@
import org.elasticsearch.ingest.core.Pipeline;
import org.elasticsearch.ingest.core.Processor;
import org.elasticsearch.ingest.core.ProcessorInfo;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;

Expand All @@ -51,6 +53,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;

public class PipelineStoreTests extends ESTestCase {

Expand All @@ -60,7 +63,7 @@ public class PipelineStoreTests extends ESTestCase {
public void init() throws Exception {
store = new PipelineStore(Settings.EMPTY);
ProcessorsRegistry.Builder registryBuilder = new ProcessorsRegistry.Builder();
registryBuilder.registerProcessor("set", (templateService, registry) -> config -> {
registryBuilder.registerProcessor("set", (registry) -> config -> {
String field = (String) config.remove("field");
String value = (String) config.remove("value");
return new Processor() {
Expand All @@ -80,7 +83,7 @@ public String getTag() {
}
};
});
registryBuilder.registerProcessor("remove", (templateService, registry) -> config -> {
registryBuilder.registerProcessor("remove", (registry) -> config -> {
String field = (String) config.remove("field");
return new Processor() {
@Override
Expand All @@ -99,7 +102,7 @@ public String getTag() {
}
};
});
store.buildProcessorFactoryRegistry(registryBuilder, null);
store.buildProcessorFactoryRegistry(registryBuilder, mock(ScriptService.class), mock(ClusterService.class));
}

public void testUpdatePipelines() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,31 @@

package org.elasticsearch.ingest;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ESTestCase;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.mockito.Mockito.mock;

public class ProcessorsRegistryTests extends ESTestCase {

public void testBuildProcessorRegistry() {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
TestProcessor.Factory factory1 = new TestProcessor.Factory();
builder.registerProcessor("1", (templateService, registry) -> factory1);
builder.registerProcessor("1", (registry) -> factory1);
TestProcessor.Factory factory2 = new TestProcessor.Factory();
builder.registerProcessor("2", (templateService, registry) -> factory2);
builder.registerProcessor("2", (registry) -> factory2);
TestProcessor.Factory factory3 = new TestProcessor.Factory();
try {
builder.registerProcessor("1", (templateService, registry) -> factory3);
builder.registerProcessor("1", (registry) -> factory3);
fail("addProcessor should have failed");
} catch(IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Processor factory already registered for name [1]"));
}

ProcessorsRegistry registry = builder.build(TestTemplateService.instance());
ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class));
assertThat(registry.getProcessorFactories().size(), equalTo(2));
assertThat(registry.getProcessorFactory("1"), sameInstance(factory1));
assertThat(registry.getProcessorFactory("2"), sameInstance(factory2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
package org.elasticsearch.ingest.core;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.junit.Before;

import java.util.ArrayList;
Expand Down Expand Up @@ -96,8 +100,8 @@ public void testOptional_InvalidType() {
public void testReadProcessors() throws Exception {
Processor processor = mock(Processor.class);
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
builder.registerProcessor("test_processor", (templateService, registry) -> config -> processor);
ProcessorsRegistry registry = builder.build(TestTemplateService.instance());
builder.registerProcessor("test_processor", (registry) -> config -> processor);
ProcessorsRegistry registry = builder.build(mock(ScriptService.class), mock(ClusterService.class));


List<Map<String, Map<String, Object>>> config = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
package org.elasticsearch.ingest.core;

import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.ingest.ProcessorsRegistry;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.ingest.TestTemplateService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -34,6 +38,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;

public class PipelineFactoryTests extends ESTestCase {

Expand Down Expand Up @@ -152,8 +157,8 @@ public void testFlattenProcessors() throws Exception {
private ProcessorsRegistry createProcessorRegistry(Map<String, Processor.Factory> processorRegistry) {
ProcessorsRegistry.Builder builder = new ProcessorsRegistry.Builder();
for (Map.Entry<String, Processor.Factory> entry : processorRegistry.entrySet()) {
builder.registerProcessor(entry.getKey(), ((templateService, registry) -> entry.getValue()));
builder.registerProcessor(entry.getKey(), ((registry) -> entry.getValue()));
}
return builder.build(TestTemplateService.instance());
return builder.build(mock(ScriptService.class), mock(ClusterService.class));
}
}
40 changes: 40 additions & 0 deletions docs/reference/ingest/ingest-node.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1258,6 +1258,46 @@ Renames an existing field. If the field doesn't exist or the new name is already
}
--------------------------------------------------

[[script-processor]]
=== Script Processor

Allows inline, stored, and file scripts to be executed within ingest pipelines.

See <<modules-scripting-using, How to use scripts>> to learn more about writing scripts. The Script Processor
leverages caching of compiled scripts for improved performance. Since the
script specified within the processor is potentially re-compiled per document, it is important
to understand how script caching works. To learn more about
caching see <<modules-scripting-using-caching, Script Caching>>.

[[script-options]]
.Script Options
[options="header"]
|======
| Name | Required | Default | Description
| `field` | yes | - | The field to set
| `lang` | no | - | The scripting language
| `file` | no | - | The script file to refer to
| `id` | no | - | The stored script id to refer to
| `inline` | no | - | An inline script to be executed
|======

You can access the current ingest document from within the script context by using the `ctx` variable.

The following example sets a new field called `field_a_plus_b` to be the sum of two existing
numeric fields `field_a` and `field_b`:

[source,js]
--------------------------------------------------
{
"script": {
"field": "field_a_plus_b",
"lang": "painless",
"inline": "return ctx.field_a + ctx.field_b"
}
}
--------------------------------------------------


[[set-processor]]
=== Set Processor
Sets one field and associates it with the specified value. If the field already exists,
Expand Down
Loading

0 comments on commit a26260f

Please sign in to comment.