diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ClassFields.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ClassFields.java index 15c02d7bab8..e2712648e7f 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ClassFields.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ClassFields.java @@ -14,18 +14,6 @@ final class ClassFields { private final Collection afterInit = new ArrayList<>(); - /** - * Add a field of given type that is initialized by the given {@link SyntaxElement} that will - * be executed in the class body. - * Renders as e.g. {@code private final Ruby field5 = RubyUtil.RUBY}. - * @param type Type of the field - * @param initializer Syntax to initialize it in-line. - * @return The field's syntax element that can be used in method bodies - */ - public ValueSyntaxElement add(final Class type, final SyntaxElement initializer) { - return addField(FieldDefinition.withInitializer(definitions.size(), type, initializer)); - } - /** * Adds a field holding the given {@link Object}. * @param obj Object to add field for diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java index 79d0cab2821..faecd1e18fd 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/ComputeStepSyntaxElement.java @@ -13,7 +13,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -25,7 +24,7 @@ /** * One step of a compiled pipeline that compiles to a {@link Dataset}. */ -public final class ComputeStepSyntaxElement implements SyntaxElement { +public final class ComputeStepSyntaxElement { private static final Path SOURCE_DIR = debugDir(); @@ -37,36 +36,26 @@ public final class ComputeStepSyntaxElement implements Syntax private static final Map, Class> CLASS_CACHE = new HashMap<>(); - /** - * Sequence number to ensure unique naming for runtime compiled classes. - */ - private static final AtomicInteger SEQUENCE = new AtomicInteger(0); - /** * Pattern to remove redundant {@code ;} from formatted code since {@link Formatter} does not * remove those. */ private static final Pattern REDUNDANT_SEMICOLON = Pattern.compile("\n[ ]*;\n"); - private final String name; - private final Iterable methods; private final ClassFields fields; private final Class type; - ComputeStepSyntaxElement(final Iterable methods, - final ClassFields fields, final Class interfce) { - this( - String.format("CompiledDataset%d", SEQUENCE.incrementAndGet()), methods, fields, - interfce - ); + public static ComputeStepSyntaxElement create( + final Iterable methods, final ClassFields fields, + final Class interfce) { + return new ComputeStepSyntaxElement<>(methods, fields, interfce); } - private ComputeStepSyntaxElement(final String name, final Iterable methods, + private ComputeStepSyntaxElement(final Iterable methods, final ClassFields fields, final Class interfce) { - this.name = name; this.methods = methods; this.fields = fields; type = interfce; @@ -82,7 +71,8 @@ public T instantiate() { if (CLASS_CACHE.containsKey(this)) { clazz = CLASS_CACHE.get(this); } else { - final String code = generateCode(); + final String name = String.format("CompiledDataset%d", CLASS_CACHE.size()); + final String code = generateCode(name); final Path sourceFile = SOURCE_DIR.resolve(String.format("%s.java", name)); Files.write(sourceFile, code.getBytes(StandardCharsets.UTF_8)); COMPILER.cookFile(sourceFile.toFile()); @@ -102,7 +92,17 @@ public T instantiate() { } @Override - public String generateCode() { + public int hashCode() { + return normalizedSource().hashCode(); + } + + @Override + public boolean equals(final Object other) { + return other instanceof ComputeStepSyntaxElement && + normalizedSource().equals(((ComputeStepSyntaxElement) other).normalizedSource()); + } + + private String generateCode(final String name) { try { return REDUNDANT_SEMICOLON.matcher(new Formatter().formatSource( String.format( @@ -110,7 +110,7 @@ public String generateCode() { name, type.getName(), SyntaxFactory.join( - fields.inlineAssigned().generateCode(), fieldsAndCtor(), + fields.inlineAssigned().generateCode(), fieldsAndCtor(name), combine( StreamSupport.stream(methods.spliterator(), false) .toArray(SyntaxElement[]::new) @@ -123,22 +123,6 @@ public String generateCode() { } } - @Override - public int hashCode() { - return normalizedSource().hashCode(); - } - - @Override - public boolean equals(final Object other) { - return other instanceof ComputeStepSyntaxElement && - normalizedSource().equals(((ComputeStepSyntaxElement) other).normalizedSource()); - } - - @Override - public String toString() { - return generateCode(); - } - private static Path debugDir() { final Path sourceDir; try { @@ -181,8 +165,7 @@ private Object[] ctorArguments() { * @return Source of this class, with its name set to {@code CONSTANT}. */ private String normalizedSource() { - return new ComputeStepSyntaxElement<>("CONSTANT", methods, fields, type) - .generateCode(); + return this.generateCode("CONSTANT"); } /** @@ -190,7 +173,7 @@ private String normalizedSource() { * constructor for * @return Java Source String */ - private String fieldsAndCtor() { + private String fieldsAndCtor(final String name) { final Closure constructor = new Closure(); final FieldDeclarationGroup ctorFields = fields.ctorAssigned(); final Collection ctor = new ArrayList<>(); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java index 117b5689df3..c787fdef510 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java @@ -36,84 +36,61 @@ public final class DatasetCompiler { * the given set of {@link JrubyEventExtLibrary.RubyEvent} and have no state. */ public static final Collection ROOT_DATASETS = Collections.singleton( - prepare(Closure.wrap(SyntaxFactory.ret(BATCH_ARG)), Closure.EMPTY, new ClassFields()) - .instantiate() + prepare( + computeAndClear( + Closure.wrap(SyntaxFactory.ret(BATCH_ARG)), Closure.EMPTY, new ClassFields() + ) + ).instantiate() ); private DatasetCompiler() { // Utility Class } - /** - * Compiles and subsequently instantiates a {@link Dataset} from given code snippets and - * constructor arguments. - * This method must be {@code synchronized} to avoid compiling duplicate classes. - * @param compute Method body of {@link Dataset#compute(RubyArray, boolean, boolean)} - * @param clear Method body of {@link Dataset#clear()} - * @param fieldValues Constructor Arguments - * @return Dataset Instance - */ - public static synchronized ComputeStepSyntaxElement prepare(final Closure compute, final Closure clear, - final ClassFields fieldValues) { - return new ComputeStepSyntaxElement<>( - Arrays.asList(MethodSyntaxElement.compute(compute), MethodSyntaxElement.clear(clear)), - fieldValues, Dataset.class - ); - } - public static ComputeStepSyntaxElement splitDataset(final Collection parents, final EventCondition condition) { final ClassFields fields = new ClassFields(); final Collection parentFields = parents.stream().map(fields::add).collect(Collectors.toList()); - final SyntaxElement arrayInit = - SyntaxFactory.constant(RubyUtil.class, "RUBY").call("newArray"); - final ValueSyntaxElement ifData = fields.add(RubyArray.class, arrayInit); - final ValueSyntaxElement elseData = fields.add(RubyArray.class, arrayInit); - final ValueSyntaxElement buffer = fields.add(RubyArray.class, arrayInit); - final ValueSyntaxElement done = fields.add(boolean.class); + final ValueSyntaxElement ifData = fields.add(new ArrayList<>()); + final ValueSyntaxElement elseData = fields.add(new ArrayList<>()); + final ValueSyntaxElement buffer = fields.add(new ArrayList<>()); final ValueSyntaxElement right = fields.add(DatasetCompiler.Complement.class); final VariableDefinition event = new VariableDefinition(JrubyEventExtLibrary.RubyEvent.class, "event"); final ValueSyntaxElement eventVal = event.access(); fields.addAfterInit( Closure.wrap( - SyntaxFactory.assignment(right, + SyntaxFactory.assignment( + right, SyntaxFactory.cast( DatasetCompiler.Complement.class, SyntaxFactory.constant( DatasetCompiler.class, DatasetCompiler.Complement.class.getSimpleName() - ).call("from", SyntaxFactory.THIS, elseData) + ).call("from", SyntaxFactory.identifier("this"), elseData) ) ) ) ); - return new ComputeStepSyntaxElement<>( - Arrays.asList( - MethodSyntaxElement.compute( - returnIffBuffered(ifData, done) - .add(bufferParents(parentFields, buffer)) - .add( - SyntaxFactory.forLoop( - event, buffer, - Closure.wrap( - SyntaxFactory.ifCondition( - fields.add(condition).call("fulfilled", eventVal), - Closure.wrap(ifData.call("add", eventVal)), - Closure.wrap(elseData.call("add", eventVal)) - ) - ) + final DatasetCompiler.ComputeAndClear compute = withOutputBuffering( + withInputBuffering( + Closure.wrap( + SyntaxFactory.forLoop( + event, buffer, + Closure.wrap( + SyntaxFactory.ifCondition( + fields.add(condition).call("fulfilled", eventVal), + Closure.wrap(ifData.call("add", eventVal)), + Closure.wrap(elseData.call("add", eventVal)) ) - ).add(clear(buffer)) - .add(SyntaxFactory.assignment(done, SyntaxFactory.TRUE)) - .add(SyntaxFactory.ret(ifData)) - ), - MethodSyntaxElement.clear( - clearIfDone( - clearSyntax(parentFields).add(clear(ifData)).add(clear(elseData)), done + ) ) - ), - MethodSyntaxElement.right(right) - ), fields, SplitDataset.class + ), parentFields, buffer + ), + clearSyntax(parentFields).add(clear(elseData)), ifData, fields + ); + return ComputeStepSyntaxElement.create( + Arrays.asList(compute.compute(), compute.clear(), MethodSyntaxElement.right(right)), + compute.fields(), SplitDataset.class ); } @@ -129,27 +106,21 @@ public static ComputeStepSyntaxElement filterDataset(final Collection parentFields = parents.stream().map(fields::add).collect(Collectors.toList()); final RubyArray inputBuffer = RubyUtil.RUBY.newArray(); - final ValueSyntaxElement inputBufferField = fields.add(inputBuffer); final ValueSyntaxElement outputBuffer = fields.add(new ArrayList<>()); final IRubyObject filter = plugin.toRuby(); final ValueSyntaxElement filterField = fields.add(filter); - final ValueSyntaxElement done = fields.add(boolean.class); final String multiFilter = "multi_filter"; - final Closure body = returnIffBuffered(outputBuffer, done).add( - bufferParents(parentFields, inputBufferField) - .add( - buffer( - outputBuffer, - SyntaxFactory.cast( - RubyArray.class, - callRubyCallsite( - fields.add(rubyCallsite(filter, multiFilter)), - fields.add(new IRubyObject[]{inputBuffer}), filterField, - multiFilter - ) - ) + final Closure body = Closure.wrap( + buffer( + outputBuffer, + SyntaxFactory.cast( + RubyArray.class, + callRubyCallsite( + fields.add(rubyCallsite(filter, multiFilter)), + fields.add(new IRubyObject[]{inputBuffer}), filterField, multiFilter ) - ).add(clear(inputBufferField)) + ) + ) ); if (plugin.hasFlush()) { body.add( @@ -160,10 +131,10 @@ public static ComputeStepSyntaxElement filterDataset(final Collection outputDataset(final Collection parentFields = parents.stream().map(fields::add).collect(Collectors.toList()); final RubyArray buffer = RubyUtil.RUBY.newArray(); - final ValueSyntaxElement inputBuffer = fields.add(buffer); final Closure clearSyntax; final Closure inlineClear; if (terminal) { @@ -240,40 +210,90 @@ public static ComputeStepSyntaxElement outputDataset(final Collection prepare(final DatasetCompiler.ComputeAndClear compute) { + return ComputeStepSyntaxElement.create( + Arrays.asList(compute.compute(), compute.clear()), compute.fields(), Dataset.class ); } - private static Closure returnIffBuffered(final MethodLevelSyntaxElement ifData, - final MethodLevelSyntaxElement done) { + /** + * Generates code that buffers all events that aren't cancelled from a given set of parent + * {@link Dataset} to a given collection, executes the given closure and then clears the + * collection used for buffering. + * @param compute Closure to execute + * @param parents Parents to buffer results for + * @param inputBuffer Buffer to store results in + * @return Closure wrapped by buffering parent results and clearing them + */ + private static Closure withInputBuffering(final Closure compute, + final Collection parents, final ValueSyntaxElement inputBuffer) { + final VariableDefinition event = + new VariableDefinition(JrubyEventExtLibrary.RubyEvent.class, "e"); + final ValueSyntaxElement eventVar = event.access(); return Closure.wrap( - SyntaxFactory.ifCondition(done, Closure.wrap(SyntaxFactory.ret(ifData))) + parents.stream().map(par -> + SyntaxFactory.forLoop( + event, computeDataset(par), + Closure.wrap( + SyntaxFactory.ifCondition( + SyntaxFactory.not( + eventVar.call("getEvent").call("isCancelled") + ), Closure.wrap(inputBuffer.call("add", eventVar)) + ) + ) + ) + ).toArray(MethodLevelSyntaxElement[]::new) + ).add(compute).add(clear(inputBuffer)); + } + + /** + * Generates compute and clear actions with logic for setting a boolean {@code done} + * flag and caching the result of the computation in the {@code compute} closure. + * Wraps {@code clear} closure with condition to only execute the clear if the {@code done} + * flag is set to {@code true}. Also adds clearing the output buffer used for caching the + * {@code compute} result to the {@code clear} closure. + * @param compute Compute closure to execute + * @param clear Clear closure to execute + * @param outputBuffer Output buffer used for caching {@code compute} result + * @param fields Class fields + * @return ComputeAndClear with adjusted methods and {@code done} flag added to fields + */ + private static DatasetCompiler.ComputeAndClear withOutputBuffering(final Closure compute, + final Closure clear, final ValueSyntaxElement outputBuffer, final ClassFields fields) { + final ValueSyntaxElement done = fields.add(boolean.class); + return computeAndClear( + Closure.wrap( + SyntaxFactory.ifCondition(done, Closure.wrap(SyntaxFactory.ret(outputBuffer))) + ).add(compute) + .add(SyntaxFactory.assignment(done, SyntaxFactory.identifier("true"))) + .add(SyntaxFactory.ret(outputBuffer)), + Closure.wrap( + SyntaxFactory.ifCondition( + done, Closure.wrap( + clear.add(clear(outputBuffer)), + SyntaxFactory.assignment(done, SyntaxFactory.identifier("false")) + ) + ) + ), fields ); } @@ -320,27 +340,6 @@ private static IRubyObject[] flushOpts(final boolean fin) { return new IRubyObject[]{res}; } - private static Closure bufferParents(final Collection parents, - final ValueSyntaxElement buffer) { - final VariableDefinition event = - new VariableDefinition(JrubyEventExtLibrary.RubyEvent.class, "e"); - final ValueSyntaxElement eventVar = event.access(); - return Closure.wrap( - parents.stream().map(par -> - SyntaxFactory.forLoop( - event, computeDataset(par), - Closure.wrap( - SyntaxFactory.ifCondition( - SyntaxFactory.not( - eventVar.call("getEvent").call("isCancelled") - ), Closure.wrap(buffer.call("add", eventVar)) - ) - ) - ) - ).toArray(MethodLevelSyntaxElement[]::new) - ); - } - /** * Special case optimization for when the output plugin is directly connected to the Queue * without any filters or conditionals in between. This special case does not arise naturally @@ -366,7 +365,7 @@ private static ComputeStepSyntaxElement outputDatasetFromRoot(final IRu private static ComputeStepSyntaxElement compileOutput(final Closure syntax, final Closure clearSyntax, final ClassFields fields) { return prepare( - syntax.add(MethodLevelSyntaxElement.RETURN_NULL), clearSyntax, fields + computeAndClear(syntax.add(MethodLevelSyntaxElement.RETURN_NULL), clearSyntax, fields) ); } @@ -398,6 +397,11 @@ private static DynamicMethod rubyCallsite(final IRubyObject rubyObject, final St return rubyObject.getMetaClass().searchMethod(name); } + private static DatasetCompiler.ComputeAndClear computeAndClear(final Closure compute, final Closure clear, + final ClassFields fields) { + return new DatasetCompiler.ComputeAndClear(compute, clear, fields); + } + /** * Complementary {@link Dataset} to a {@link SplitDataset} representing the * negative branch of the {@code if} statement. @@ -454,4 +458,36 @@ public void clear() { } } } + + /** + * Represents the 3-tuple of {@code compute} method, {@code clear} method and + * {@link ClassFields} used by both methods. + */ + private static final class ComputeAndClear { + + private final MethodSyntaxElement compute; + + private final MethodSyntaxElement clear; + + private final ClassFields fields; + + private ComputeAndClear(final Closure compute, final Closure clear, + final ClassFields fields) { + this.compute = MethodSyntaxElement.compute(compute); + this.clear = MethodSyntaxElement.clear(clear); + this.fields = fields; + } + + public MethodSyntaxElement compute() { + return compute; + } + + public MethodSyntaxElement clear() { + return clear; + } + + public ClassFields fields() { + return fields; + } + } } diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/SyntaxFactory.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/SyntaxFactory.java index d1fbbf6aee7..9d8970a0802 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/SyntaxFactory.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/SyntaxFactory.java @@ -9,12 +9,6 @@ */ final class SyntaxFactory { - public static final SyntaxFactory.IdentifierStatement THIS = identifier("this"); - - public static final SyntaxFactory.IdentifierStatement TRUE = identifier("true"); - - public static final SyntaxFactory.IdentifierStatement FALSE = identifier("false"); - /** * Joins given {@link String}s without delimiter. * @param parts Strings to join diff --git a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java index f1342b845a4..37e6554ba4f 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/compiler/DatasetCompilerTest.java @@ -1,6 +1,5 @@ package org.logstash.config.ir.compiler; -import java.util.Collection; import org.jruby.RubyArray; import org.jruby.runtime.ThreadContext; import org.junit.Test; @@ -15,40 +14,6 @@ public final class DatasetCompilerTest { - @Test - public void compilesEmptyMethod() { - final Dataset func = DatasetCompiler.prepare( - Closure.wrap(SyntaxFactory.ret(DatasetCompiler.BATCH_ARG.call("to_a"))), - Closure.EMPTY, new ClassFields() - ).instantiate(); - final RubyArray batch = RubyUtil.RUBY.newArray(); - assertThat(func.compute(batch, false, false), is(batch)); - } - - @Test - public void compilesParametrizedMethod() { - final RubyArray batch = RubyUtil.RUBY.newArray( - JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event()) - ); - final VariableDefinition eventsDef = new VariableDefinition(Collection.class, "events"); - final ValueSyntaxElement events = eventsDef.access(); - final ClassFields fields = new ClassFields(); - final Dataset func = DatasetCompiler.prepare( - Closure.wrap( - SyntaxFactory.definition(eventsDef, DatasetCompiler.BATCH_ARG.call("to_a")), - events.call( - "add", - fields.add( - JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event()) - ) - ), - SyntaxFactory.ret(events) - ), - Closure.EMPTY, fields - ).instantiate(); - assertThat(func.compute(batch, false, false).size(), is(2)); - } - /** * Smoke test ensuring that output {@link Dataset} is compiled correctly. */