Skip to content

Commit

Permalink
Add Logstash config source as comments to generated code
Browse files Browse the repository at this point in the history
This change is purely to help enable debugging purposes by logging the config source (as writting in the configuration) to generated class relationship.  It is logged to appear as comments to the generated code.

Fixes elastic#8691
  • Loading branch information
jakelandis authored and original-brownbear committed Dec 22, 2017
1 parent e9ba86e commit e5f3593
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -248,21 +248,22 @@ private Dataset compile() {
} else {
return DatasetCompiler.terminalDataset(outputs.stream().map(
leaf ->
outputDataset(leaf.getId(), flatten(DatasetCompiler.ROOT_DATASETS, leaf))
outputDataset(leaf.getId(), getConfigSource(leaf), flatten(DatasetCompiler.ROOT_DATASETS, leaf))
).collect(Collectors.toList()));
}
}

/**
* Build a {@link Dataset} representing the {@link JrubyEventExtLibrary.RubyEvent}s after
* the application of the given filter.
* @param vertex Vertex Id of the filter to create this {@link Dataset} for
* @param vertexId Vertex Id of the filter to create this {@link Dataset}
* @param configSource The Logstash configuration that maps to the returned Dataset
* @param datasets All the datasets that pass through this filter
* @return Filter {@link Dataset}
*/
private Dataset filterDataset(final String vertex, final Collection<Dataset> datasets) {
private Dataset filterDataset(final String vertexId, String configSource, final Collection<Dataset> datasets) {
return plugins.computeIfAbsent(
vertex, v -> DatasetCompiler.filterDataset(datasets, filters.get(v))
vertexId, v -> DatasetCompiler.filterDataset(datasets, filters.get(v), configSource)
);
}

Expand All @@ -271,13 +272,14 @@ private Dataset filterDataset(final String vertex, final Collection<Dataset> dat
* the application of the given output.
* @param vertexId Vertex Id of the filter to create this {@link Dataset} for
* filter node in the topology once
* @param configSource The Logstash configuration that maps to the returned Dataset
* @param datasets All the datasets that are passed into this output
* @return Output {@link Dataset}
*/
private Dataset outputDataset(final String vertexId, final Collection<Dataset> datasets) {
private Dataset outputDataset(final String vertexId, String configSource, final Collection<Dataset> datasets) {
return plugins.computeIfAbsent(
vertexId, v -> DatasetCompiler.outputDataset(
datasets, outputs.get(v), outputs.size() == 1
datasets, outputs.get(v), configSource, outputs.size() == 1
)
);
}
Expand All @@ -288,12 +290,13 @@ private Dataset outputDataset(final String vertexId, final Collection<Dataset> d
* @param datasets Datasets to split
* @param condition Condition that must be fulfilled
* @param index Vertex id to cache the resulting {@link Dataset} under
* @param configSource The Logstash configuration that maps to the returned Dataset
* @return The half of the datasets contents that fulfils the condition
*/
private SplitDataset split(final Collection<Dataset> datasets,
final EventCondition condition, final String index) {
final EventCondition condition, final String index, String configSource) {
return iffs.computeIfAbsent(
index, ind -> DatasetCompiler.splitDataset(datasets, condition)
index, ind -> DatasetCompiler.splitDataset(datasets, condition, configSource)
);
}

Expand Down Expand Up @@ -329,17 +332,17 @@ private Collection<Dataset> compileDependencies(final Vertex start,
final Collection<Dataset> transientDependencies = flatten(datasets, dependency);
final String id = dependency.getId();
if (isFilter(dependency)) {
return filterDataset(id, transientDependencies);
return filterDataset(id, getConfigSource(dependency), transientDependencies);
} else if (isOutput(dependency)) {
return outputDataset(id, transientDependencies);
return outputDataset(id, getConfigSource(dependency), transientDependencies);
} else {
// We know that it's an if vertex since the the input children are either
// output, filter or if in type.
final IfVertex ifvert = (IfVertex) dependency;
final SplitDataset ifDataset = split(
transientDependencies,
EventCondition.Compiler.buildCondition(ifvert.getBooleanExpression()),
id
id, getConfigSource(dependency)
);
// It is important that we double check that we are actually dealing with the
// positive/left branch of the if condition
Expand All @@ -353,4 +356,21 @@ private Collection<Dataset> compileDependencies(final Vertex start,
}).collect(Collectors.toList());
}
}

/**
* Gets the configuration source for debugging purposes. Uses the metadata text if it exists, else the vertex toString method
* @param vertex The {@link Vertex} to read the Logstash configuration source
* @return A String that can be useful for debugging the Logstash configuration to generated Dataset/code
*/
private String getConfigSource(Vertex vertex){
if( vertex == null){
return "(vertex is null, this is likely a bug)";
}
//conditionals will use this
if(vertex.getSourceWithMetadata() == null){
return vertex.toString();
}
String text = vertex.getSourceWithMetadata().getText();
return text == null ? "(vertex.getSourceWithMetadata().getText() is null, this is likely a bug)" : text;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ final class ComputeStepSyntaxElement implements SyntaxElement {

private static final Map<String, String> SOURCE_CACHE = new HashMap<>();

private static final Map<String, String> CONFIG_SOURCE_CACHE = new HashMap<>();

/**
* Sequence number to ensure unique naming for runtime compiled classes.
*/
Expand All @@ -49,18 +51,28 @@ final class ComputeStepSyntaxElement implements SyntaxElement {

private final ClassFields fields;

/**
private final String configSource;

/**
* Get the generated source
* @return sorted and formatted lines of generated code
*/
public static List<String> getGeneratedSource() {
List<String> output = new ArrayList<>();

output.add("/******************************************************************************************");
CONFIG_SOURCE_CACHE.forEach((k, v) -> {
output.add("* " + v + " <==> " + k.replaceAll("[\\t\\n\\r\\s]+",""));
});
output.add("******************************************************************************************/");

SOURCE_CACHE.forEach((k, v) -> {
output.add(String.format("class %s {", k));
LOGGER.trace("{}:{}", k, v);
getFormattedLines(v, output, INDENT_WIDTH);
output.add("}");
});

return output;
}

Expand Down Expand Up @@ -104,19 +116,21 @@ private static List<String> getFormattedLines(String input, List<String> output,
}

ComputeStepSyntaxElement(final Iterable<MethodSyntaxElement> methods,
final ClassFields fields, DatasetCompiler.DatasetFlavor datasetFlavor) {
this(String.format("Generated%d_" + datasetFlavor.getDisplay() + "Dataset", SEQUENCE.incrementAndGet()), methods, fields);
final ClassFields fields, DatasetCompiler.DatasetFlavor datasetFlavor, String configSource) {
this(String.format("Generated%d_" + datasetFlavor.getDisplay() + "Dataset", SEQUENCE.incrementAndGet()), methods, fields, configSource);
}

private ComputeStepSyntaxElement(final String name, final Iterable<MethodSyntaxElement> methods,
final ClassFields fields) {
final ClassFields fields, String configSource) {
this.name = name;
this.methods = methods;
this.fields = fields;
this.configSource = configSource;
}

public <T extends Dataset> T instantiate(final Class<T> interfce) {
try {

final Class<? extends Dataset> clazz;
if (CLASS_CACHE.containsKey(this)) {
clazz = CLASS_CACHE.get(this);
Expand All @@ -133,6 +147,7 @@ public <T extends Dataset> T instantiate(final Class<T> interfce) {
CLASS_LOADER.addClass(clazz);
CLASS_CACHE.put(this, clazz);
}
CONFIG_SOURCE_CACHE.putIfAbsent(configSource, clazz.getName());
return (T) clazz.<T>getConstructor(ctorTypes()).newInstance(ctorArguments());
} catch (final CompileException | IOException | NoSuchMethodException
| InvocationTargetException | InstantiationException | IllegalAccessException ex) {
Expand Down Expand Up @@ -185,7 +200,7 @@ public boolean equals(final Object other) {
* @return Source of this class, with its name set to {@code CONSTANT}.
*/
private String normalizedSource() {
return new ComputeStepSyntaxElement("CONSTANT", methods, fields)
return new ComputeStepSyntaxElement("CONSTANT", methods, fields, "")
.generateCode();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ String getDisplay() {
* the given set of {@link JrubyEventExtLibrary.RubyEvent} and have no state.
*/
public static final Collection<Dataset> ROOT_DATASETS = Collections.singleton(
compile(Closure.wrap(SyntaxFactory.ret(BATCH_ARG)), Closure.EMPTY, new ClassFields(), DatasetFlavor.ROOT)
compile(Closure.wrap(SyntaxFactory.ret(BATCH_ARG)), Closure.EMPTY, new ClassFields(), DatasetFlavor.ROOT, "(root)")
);

private DatasetCompiler() {
Expand All @@ -78,19 +78,20 @@ private DatasetCompiler() {
* @param clear Method body of {@link Dataset#clear()}
* @param fieldValues Constructor Arguments
* @param datasetFlavor The flavor of {@link Dataset} to compile.
* @param configSource The Logstash configuration that maps to the returned Dataset
* This is only helpful for human debugging to differentiate between the intended usage of the {@link Dataset}
* @return Dataset Instance
*/
public static synchronized Dataset compile(final Closure compute, final Closure clear,
final ClassFields fieldValues, final DatasetFlavor datasetFlavor) {
final ClassFields fieldValues, final DatasetFlavor datasetFlavor, String configSource) {
return new ComputeStepSyntaxElement(
Arrays.asList(MethodSyntaxElement.compute(compute), MethodSyntaxElement.clear(clear)),
fieldValues, datasetFlavor
fieldValues, datasetFlavor, configSource
).instantiate(Dataset.class);
}

public static SplitDataset splitDataset(final Collection<Dataset> parents,
final EventCondition condition) {
final EventCondition condition, String configSource) {
final ClassFields fields = new ClassFields();
final Collection<ValueSyntaxElement> parentFields =
parents.stream().map(fields::add).collect(Collectors.toList());
Expand Down Expand Up @@ -127,18 +128,19 @@ public static SplitDataset splitDataset(final Collection<Dataset> parents,
.add(SyntaxFactory.assignment(done, SyntaxFactory.FALSE))
),
MethodSyntaxElement.right(elseData)
), fields, DatasetFlavor.CONDITIONAL
), fields, DatasetFlavor.CONDITIONAL, configSource
).instantiate(SplitDataset.class);
}

/**
* Compiles a {@link Dataset} representing a filter plugin without flush behaviour.
* @param parents Parent {@link Dataset} to aggregate for this filter
* @param plugin Filter Plugin
* @param configSource The Logstash configuration that maps to the returned Dataset
* @return Dataset representing the filter plugin
*/
public static Dataset filterDataset(final Collection<Dataset> parents,
final RubyIntegration.Filter plugin) {
final RubyIntegration.Filter plugin, String configSource) {
final ClassFields fields = new ClassFields();
final Collection<ValueSyntaxElement> parentFields =
parents.stream().map(fields::add).collect(Collectors.toList());
Expand Down Expand Up @@ -179,7 +181,7 @@ public static Dataset filterDataset(final Collection<Dataset> parents,
Closure.wrap(
clearSyntax(parentFields), clear(outputBuffer),
SyntaxFactory.assignment(done, SyntaxFactory.FALSE)
), fields, DatasetFlavor.FILTER
), fields, DatasetFlavor.FILTER, configSource
);
}

Expand All @@ -203,7 +205,7 @@ public static Dataset terminalDataset(final Collection<Dataset> parents) {
Closure.wrap(
parentFields.stream().map(DatasetCompiler::computeDataset)
.toArray(MethodLevelSyntaxElement[]::new)
).add(clearSyntax(parentFields)), Closure.EMPTY, fields
).add(clearSyntax(parentFields)), Closure.EMPTY, fields, "(terminal)"
);
} else if (count == 1) {
// No need for a terminal dataset here, if there is only a single parent node we can
Expand All @@ -230,15 +232,16 @@ public static Dataset terminalDataset(final Collection<Dataset> parents) {
* every call to {@code compute}.
* @param parents Parent Datasets
* @param output Output Plugin (of Ruby type OutputDelegator)
* @param configSource The Logstash configuration that maps to the output Dataset
* @param terminal Set to true if this output is the only output in the pipeline
* @return Output Dataset
*/
public static Dataset outputDataset(final Collection<Dataset> parents, final IRubyObject output,
public static Dataset outputDataset(final Collection<Dataset> parents, final IRubyObject output, String configSource,
final boolean terminal) {
final DynamicMethod method = rubyCallsite(output, MULTI_RECEIVE);
// Short-circuit trivial case of only output(s) in the pipeline
if (parents == ROOT_DATASETS) {
return outputDatasetFromRoot(output, method);
return outputDatasetFromRoot(output, method, configSource);
}
final ClassFields fields = new ClassFields();
final Collection<ValueSyntaxElement> parentFields =
Expand All @@ -264,7 +267,7 @@ public static Dataset outputDataset(final Collection<Dataset> parents, final IRu
clear(inputBuffer),
inlineClear
),
clearSyntax, fields
clearSyntax, fields, configSource
);
}

Expand Down Expand Up @@ -344,22 +347,23 @@ event, computeDataset(par),
/**
* Special case optimization for when the output plugin is directly connected to the Queue
* without any filters or conditionals in between. This special case does not arise naturally
* from {@link DatasetCompiler#outputDataset(Collection, IRubyObject, boolean)} since it saves
* from {@link DatasetCompiler#outputDataset(Collection, IRubyObject, String, boolean)} since it saves
* the internal buffering of events and instead forwards events directly from the batch to the
* Output plugin.
* @param output Output Plugin
* @param configSource The Logstash configuration that maps to the returned Dataset
* @return Dataset representing the Output
*/
private static Dataset outputDatasetFromRoot(final IRubyObject output,
final DynamicMethod method) {
final DynamicMethod method, String configSource) {
final ClassFields fields = new ClassFields();
final ValueSyntaxElement args = fields.add(new IRubyObject[1]);
return compileOutput(
Closure.wrap(
SyntaxFactory.assignment(SyntaxFactory.arrayField(args, 0), BATCH_ARG),
callRubyCallsite(fields.add(method), args, fields.add(output), MULTI_RECEIVE)
),
Closure.EMPTY, fields
Closure.EMPTY, fields, configSource
);
}

Expand All @@ -372,9 +376,9 @@ public static List<String> getGeneratedSource() {
}

private static Dataset compileOutput(final Closure syntax, final Closure clearSyntax,
final ClassFields fields) {
final ClassFields fields, String configSource) {
return compile(
syntax.add(MethodLevelSyntaxElement.RETURN_NULL), clearSyntax, fields, DatasetFlavor.OUTPUT
syntax.add(MethodLevelSyntaxElement.RETURN_NULL), clearSyntax, fields, DatasetFlavor.OUTPUT, configSource
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public final class DatasetCompilerTest {
public void compilesEmptyMethod() {
final Dataset func = DatasetCompiler.compile(
Closure.wrap(SyntaxFactory.ret(DatasetCompiler.BATCH_ARG.call("to_a"))),
Closure.EMPTY, new ClassFields(), DatasetCompiler.DatasetFlavor.ROOT
Closure.EMPTY, new ClassFields(), DatasetCompiler.DatasetFlavor.ROOT, "foo"
);
final RubyArray batch = RubyUtil.RUBY.newArray();
assertThat(func.compute(batch, false, false), is(batch));
Expand All @@ -44,7 +44,7 @@ public void compilesParametrizedMethod() {
),
SyntaxFactory.ret(events)
),
Closure.EMPTY, fields, DatasetCompiler.DatasetFlavor.ROOT
Closure.EMPTY, fields, DatasetCompiler.DatasetFlavor.ROOT, "foo"
);
assertThat(func.compute(batch, false, false).size(), is(2));
}
Expand All @@ -60,7 +60,7 @@ public void compilesOutputDataset() {
RubyUtil.RUBY.evalScriptlet(
"output = Object.new\noutput.define_singleton_method(:multi_receive) do |batch|\nend\noutput"
),
true
"foo", true
).compute(RubyUtil.RUBY.newArray(), false, false),
nullValue()
);
Expand All @@ -71,7 +71,7 @@ public void compilesSplitDataset() {
final FieldReference key = FieldReference.from("foo");
final EventCondition condition = event -> event.getEvent().includes(key);
final SplitDataset left =
DatasetCompiler.splitDataset(DatasetCompiler.ROOT_DATASETS, condition);
DatasetCompiler.splitDataset(DatasetCompiler.ROOT_DATASETS, condition, "foo");
final Event trueEvent = new Event();
trueEvent.setField(key, "val");
final JrubyEventExtLibrary.RubyEvent falseEvent =
Expand Down

0 comments on commit e5f3593

Please sign in to comment.