diff --git a/logstash-core/lib/logstash/java_filter_delegator.rb b/logstash-core/lib/logstash/java_filter_delegator.rb deleted file mode 100644 index b25ad219d4c..00000000000 --- a/logstash-core/lib/logstash/java_filter_delegator.rb +++ /dev/null @@ -1,74 +0,0 @@ -# encoding: utf-8 -# -module LogStash - class JavaFilterDelegator - include org.logstash.config.ir.compiler.RubyIntegration::Filter - extend Forwardable - DELEGATED_METHODS = [ - :register, - :close, - :threadsafe?, - :do_close, - :do_stop, - :periodic_flush, - :reloadable? - ] - def_delegators :@filter, *DELEGATED_METHODS - - attr_reader :id - - def initialize(filter, id) - @klass = filter.class - @id = id - @filter = filter - - # Scope the metrics to the plugin - namespaced_metric = filter.metric - @metric_events = namespaced_metric.namespace(:events) - @metric_events_in = @metric_events.counter(:in) - @metric_events_out = @metric_events.counter(:out) - @metric_events_time = @metric_events.counter(:duration_in_millis) - namespaced_metric.gauge(:name, config_name) - - # Not all the filters will do bufferings - @flushes = @filter.respond_to?(:flush) - end - - def toRuby - self - end - - def config_name - @klass.config_name - end - - def multi_filter(events) - @metric_events_in.increment(events.size) - - start_time = java.lang.System.nano_time - new_events = @filter.multi_filter(events) - @metric_events_time.increment((java.lang.System.nano_time - start_time) / 1_000_000) - - # There is no guarantee in the context of filter - # that EVENTS_IN == EVENTS_OUT, see the aggregates and - # the split filter - c = new_events.count { |event| !event.cancelled? } - @metric_events_out.increment(c) if c > 0 - new_events - end - - def has_flush - @flushes - end - - def flush(options = {}) - # we also need to trace the number of events - # coming from a specific filters. - # Filter plugins that does buffering or spooling of events like the - # `Logstash-filter-aggregates` can return `NIL` and will flush on the next flush ticks. - new_events = @filter.flush(options) || [] - @metric_events_out.increment(new_events.size) - new_events - end - end -end diff --git a/logstash-core/lib/logstash/java_pipeline.rb b/logstash-core/lib/logstash/java_pipeline.rb index c9c8460b8c3..ed1edea8956 100644 --- a/logstash-core/lib/logstash/java_pipeline.rb +++ b/logstash-core/lib/logstash/java_pipeline.rb @@ -15,7 +15,6 @@ require "logstash/instrument/namespaced_null_metric" require "logstash/instrument/collector" require "logstash/util/dead_letter_queue_manager" -require "logstash/java_filter_delegator" require "logstash/queue_factory" require "logstash/compiler" require "securerandom" diff --git a/logstash-core/spec/logstash/java_filter_delegator_spec.rb b/logstash-core/spec/logstash/java_filter_delegator_spec.rb index faf88667a00..931afa965fc 100644 --- a/logstash-core/spec/logstash/java_filter_delegator_spec.rb +++ b/logstash-core/spec/logstash/java_filter_delegator_spec.rb @@ -1,11 +1,13 @@ # encoding: utf-8 require "spec_helper" -require "logstash/java_filter_delegator" +require "logstash/filter_delegator" require "logstash/instrument/null_metric" require "logstash/event" require "logstash/execution_context" require "support/shared_contexts" +java_import org.logstash.RubyUtil + describe LogStash::JavaFilterDelegator do class MockGauge @@ -15,23 +17,35 @@ def increment(_) include_context "execution_context" - let(:filter_id) { "my-filter" } + let(:filter_id) { "my_filter" } let(:config) do { "host" => "127.0.0.1", "id" => filter_id } end let(:collector) { [] } - let(:counter_in) { MockGauge.new } - let(:counter_out) { MockGauge.new } - let(:counter_time) { MockGauge.new } - let(:metric) { LogStash::Instrument::NamespacedNullMetric.new(collector, :null) } + let(:metric) { + LogStash::Instrument::NamespacedMetric.new( + LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new), [:filter] + ) + } + let(:counter_in) { + counter = metric.counter(:in) + counter.increment(0) + counter + } + let(:counter_out) { + counter = metric.counter(:out) + counter.increment(0) + counter + } + let(:counter_time) { + counter = metric.counter(:duration_in_millis) + counter.increment(0) + counter + } let(:events) { [LogStash::Event.new, LogStash::Event.new] } before :each do allow(pipeline).to receive(:id).and_return(pipeline_id) - allow(metric).to receive(:namespace).with(anything).and_return(metric) - allow(metric).to receive(:counter).with(:in).and_return(counter_in) - allow(metric).to receive(:counter).with(:out).and_return(counter_out) - allow(metric).to receive(:counter).with(:duration_in_millis).and_return(counter_time) end let(:plugin_klass) do @@ -64,37 +78,48 @@ def filter(event) end it "defines a flush method" do - expect(subject.respond_to?(:flush)).to be_truthy + expect(subject.to_java.hasFlush).to be_truthy end context "when the flush return events" do it "increments the out" do - subject.multi_filter([LogStash::Event.new]) - expect(counter_out).to receive(:increment).with(1) - subject.flush({}) + ruby_context = RubyUtil::RUBY.getCurrentContext + subject.to_java.multiFilter(ruby_context, [LogStash::Event.new]) + event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path( + "filter/my_filter" + )[:filter][:my_filter][:events] + expect(event_metrics[:out].value).to eq(0) + subject.to_java.flush(ruby_context, {}) + expect(event_metrics[:out].value).to eq(1) end end context "when the flush doesn't return anything" do it "doesnt increment the out" do - expect(metric).not_to receive(:increment) - subject.flush({}) + subject.to_java.flush(RubyUtil::RUBY.getCurrentContext, {}) + expect( + metric.collector.snapshot_metric.metric_store. + get_with_path("filter/my_filter")[:filter][:my_filter][:events][:duration_in_millis].value + ).to eq(0) end end context "when the filter buffer events" do - before do - allow(metric).to receive(:increment).with(anything, anything) - end it "has incremented :in" do - expect(counter_in).to receive(:increment).with(events.size) - subject.multi_filter(events) + subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events) + expect( + metric.collector.snapshot_metric.metric_store. + get_with_path("filter/my_filter")[:filter][:my_filter][:events][:in].value + ).to eq(events.size) end it "has not incremented :out" do - expect(counter_out).not_to receive(:increment).with(anything) - subject.multi_filter(events) + subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events) + expect( + metric.collector.snapshot_metric.metric_store. + get_with_path("filter/my_filter")[:filter][:my_filter][:events][:out].value + ).to eq(0) end end @@ -114,15 +139,13 @@ def filter(event) end end - before do - allow(metric).to receive(:increment).with(anything, anything) - end - it "increments the in/out of the metric" do - expect(counter_in).to receive(:increment).with(events.size) - expect(counter_out).to receive(:increment).with(events.size * 2) - - subject.multi_filter(events) + subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events) + event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path( + "filter/my_filter" + )[:filter][:my_filter][:events] + expect(event_metrics[:in].value).to eq(events.size) + expect(event_metrics[:out].value).to eq(events.size * 2) end end end @@ -144,14 +167,16 @@ def filter(event) end it "doesnt define a flush method" do - expect(subject.has_flush).to be_falsey + expect(subject.to_java.hasFlush).to be_falsey end it "increments the in/out of the metric" do - expect(counter_in).to receive(:increment).with(events.size) - expect(counter_out).to receive(:increment).with(events.size) - - subject.multi_filter(events) + subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events) + event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path( + "filter/my_filter" + )[:filter][:my_filter][:events] + expect(event_metrics[:in].value).to eq(events.size) + expect(event_metrics[:out].value).to eq(events.size) end end @@ -165,7 +190,7 @@ def filter(event) # I am not testing the behavior of these methods # this is done in the plugin tests. I just want to make sure # the proxy delegates the methods. - LogStash::JavaFilterDelegator::DELEGATED_METHODS.each do |method| + LogStash::FilterDelegator::DELEGATED_METHODS.each do |method| it "delegate method: `#{method}` to the filter" do expect(subject.respond_to?(method)) end diff --git a/logstash-core/src/main/java/org/logstash/RubyUtil.java b/logstash-core/src/main/java/org/logstash/RubyUtil.java index 365cd9965af..44f993cf88c 100644 --- a/logstash-core/src/main/java/org/logstash/RubyUtil.java +++ b/logstash-core/src/main/java/org/logstash/RubyUtil.java @@ -10,6 +10,7 @@ import org.jruby.runtime.ObjectAllocator; import org.logstash.ackedqueue.ext.JRubyAckedQueueExt; import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt; +import org.logstash.config.ir.compiler.FilterDelegatorExt; import org.logstash.config.ir.compiler.OutputDelegatorExt; import org.logstash.execution.QueueReadClientBase; import org.logstash.ext.JRubyWrappedWriteClientExt; @@ -68,6 +69,8 @@ public final class RubyUtil { public static final RubyClass OUTPUT_DELEGATOR_CLASS; + public static final RubyClass FILTER_DELEGATOR_CLASS; + static { RUBY = Ruby.getGlobalRuntime(); LOGSTASH_MODULE = RUBY.getOrCreateModule("LogStash"); @@ -98,6 +101,9 @@ public final class RubyUtil { OUTPUT_DELEGATOR_CLASS = setupLogstashClass( OutputDelegatorExt::new, OutputDelegatorExt.class ); + FILTER_DELEGATOR_CLASS = setupLogstashClass( + FilterDelegatorExt::new, FilterDelegatorExt.class + ); final RubyModule json = LOGSTASH_MODULE.defineOrGetModuleUnder("Json"); final RubyClass stdErr = RUBY.getStandardError(); LOGSTASH_ERROR = LOGSTASH_MODULE.defineClassUnder( diff --git a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java index 65bac55539b..7790a4acb70 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/CompiledPipeline.java @@ -19,6 +19,7 @@ import org.logstash.config.ir.compiler.Dataset; import org.logstash.config.ir.compiler.DatasetCompiler; import org.logstash.config.ir.compiler.EventCondition; +import org.logstash.config.ir.compiler.FilterDelegatorExt; import org.logstash.config.ir.compiler.OutputDelegatorExt; import org.logstash.config.ir.compiler.RubyIntegration; import org.logstash.config.ir.compiler.SplitDataset; @@ -52,7 +53,7 @@ public final class CompiledPipeline { /** * Configured Filters, indexed by their ID as returned by {@link PluginVertex#getId()}. */ - private final Map filters; + private final Map filters; /** * Configured outputs. @@ -82,7 +83,7 @@ public Collection outputs() { return Collections.unmodifiableCollection(outputs.values()); } - public Collection filters() { + public Collection filters() { return Collections.unmodifiableCollection(filters.values()); } @@ -119,9 +120,9 @@ private Map setupOutputs() { /** * Sets up all Ruby filters learnt from {@link PipelineIR}. */ - private Map setupFilters() { + private Map setupFilters() { final Collection filterPlugins = pipelineIR.getFilterPluginVertices(); - final Map res = + final Map res = new HashMap<>(filterPlugins.size(), 1.0F); for (final PluginVertex plugin : filterPlugins) { res.put(plugin.getId(), buildFilter(plugin)); @@ -174,11 +175,11 @@ private RubyHash convertArgs(final PluginDefinition def) { } /** - * Compiles a {@link RubyIntegration.Filter} from a given {@link PluginVertex}. + * Compiles a {@link FilterDelegatorExt} from a given {@link PluginVertex}. * @param vertex Filter {@link PluginVertex} - * @return Compiled {@link RubyIntegration.Filter} + * @return Compiled {@link FilterDelegatorExt} */ - private RubyIntegration.Filter buildFilter(final PluginVertex vertex) { + private FilterDelegatorExt buildFilter(final PluginVertex vertex) { final PluginDefinition def = vertex.getPluginDefinition(); final SourceWithMetadata source = vertex.getSourceWithMetadata(); return pluginFactory.buildFilter( @@ -188,9 +189,9 @@ private RubyIntegration.Filter buildFilter(final PluginVertex vertex) { } /** - * Checks if a certain {@link Vertex} represents a {@link RubyIntegration.Filter}. + * Checks if a certain {@link Vertex} represents a {@link FilterDelegatorExt}. * @param vertex Vertex to check - * @return True iff {@link Vertex} represents a {@link RubyIntegration.Filter} + * @return True iff {@link Vertex} represents a {@link FilterDelegatorExt} */ private boolean isFilter(final Vertex vertex) { return filters.containsKey(vertex.getId()); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java index 74407754457..aafbd93d1cd 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/DatasetCompiler.java @@ -85,30 +85,22 @@ public static ComputeStepSyntaxElement splitDataset(final Collecti * @return Dataset representing the filter plugin */ public static ComputeStepSyntaxElement filterDataset(final Collection parents, - final RubyIntegration.Filter plugin) { + final FilterDelegatorExt plugin) { final ClassFields fields = new ClassFields(); final ValueSyntaxElement outputBuffer = fields.add(new ArrayList<>()); final Closure clear = Closure.wrap(); final Closure compute; if (parents.isEmpty()) { - final ValueSyntaxElement inputBufferHolder = fields.add(new IRubyObject[1]); - compute = filterBody( - Closure.wrap( - SyntaxFactory.assignment( - SyntaxFactory.arrayField(inputBufferHolder, 0), BATCH_ARG - ) - ), outputBuffer, inputBufferHolder, fields, plugin - ); + compute = filterBody(outputBuffer, BATCH_ARG, fields, plugin); } else { final Collection parentFields = parents.stream().map(fields::add).collect(Collectors.toList()); final RubyArray inputBuffer = RubyUtil.RUBY.newArray(); clear.add(clearSyntax(parentFields)); + final ValueSyntaxElement inputBufferField = fields.add(inputBuffer); compute = withInputBuffering( - filterBody( - Closure.wrap(), outputBuffer, fields.add(new IRubyObject[]{inputBuffer}), - fields, plugin - ), parentFields, fields.add(inputBuffer) + filterBody(outputBuffer, inputBufferField, fields, plugin), + parentFields, inputBufferField ); } return prepare(withOutputBuffering(compute, clear, outputBuffer, fields)); @@ -198,31 +190,20 @@ private static ValueSyntaxElement invokeOutput(final ValueSyntaxElement output, return output.call("multiReceive", ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, events); } - private static Closure filterBody(final Closure body, final ValueSyntaxElement outputBuffer, - final ValueSyntaxElement inputBufferHolder, final ClassFields fields, - final RubyIntegration.Filter plugin) { - final String multiFilter = "multi_filter"; - final IRubyObject filter = plugin.toRuby(); - final ValueSyntaxElement filterField = fields.add(filter); - body.add( + private static Closure filterBody(final ValueSyntaxElement outputBuffer, + final ValueSyntaxElement inputBuffer, final ClassFields fields, + final FilterDelegatorExt plugin) { + final ValueSyntaxElement filterField = fields.add(plugin); + final Closure body = Closure.wrap( buffer( outputBuffer, - SyntaxFactory.cast( - RubyArray.class, - callRubyCallsite( - fields.add(rubyCallsite(filter, multiFilter)), inputBufferHolder - , filterField, multiFilter - ) + filterField.call( + "multiFilter", ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, inputBuffer ) ) ); if (plugin.hasFlush()) { - body.add( - callFilterFlush( - fields, outputBuffer, fields.add(rubyCallsite(filter, FLUSH)), filterField, - !plugin.periodicFlush() - ) - ); + body.add(callFilterFlush(fields, outputBuffer, filterField, !plugin.periodicFlush())); } return body; } @@ -321,8 +302,8 @@ private static DatasetCompiler.ComputeAndClear withOutputBuffering(final Closure } private static MethodLevelSyntaxElement callFilterFlush(final ClassFields fields, - final ValueSyntaxElement resultBuffer, final ValueSyntaxElement flushMethod, - final ValueSyntaxElement filterPlugin, final boolean shutdownOnly) { + final ValueSyntaxElement resultBuffer, final ValueSyntaxElement filterPlugin, + final boolean shutdownOnly) { final MethodLevelSyntaxElement condition; final ValueSyntaxElement flushArgs; final ValueSyntaxElement flushFinal = fields.add(flushOpts(true)); @@ -340,10 +321,7 @@ private static MethodLevelSyntaxElement callFilterFlush(final ClassFields fields Closure.wrap( buffer( resultBuffer, - SyntaxFactory.cast( - RubyArray.class, - callRubyCallsite(flushMethod, flushArgs, filterPlugin, FLUSH) - ) + filterPlugin.call(FLUSH, ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, flushArgs) ) ) ); @@ -357,10 +335,10 @@ private static ValueSyntaxElement computeDataset(final ValueSyntaxElement parent return parent.call("compute", BATCH_ARG, FLUSH_ARG, SHUTDOWN_ARG); } - private static IRubyObject[] flushOpts(final boolean fin) { + private static RubyHash flushOpts(final boolean fin) { final RubyHash res = RubyHash.newHash(RubyUtil.RUBY); res.put(RubyUtil.RUBY.newSymbol("final"), RubyUtil.RUBY.newBoolean(fin)); - return new IRubyObject[]{res}; + return res; } private static ComputeStepSyntaxElement compileOutput(final Closure syntax, @@ -375,29 +353,12 @@ private static MethodLevelSyntaxElement buffer(final ValueSyntaxElement resultBu return resultBuffer.call("addAll", argument); } - private static ValueSyntaxElement callRubyCallsite(final ValueSyntaxElement callsite, - final ValueSyntaxElement argument, final ValueSyntaxElement plugin, final String method) { - return callsite.call( - "call", - ValueSyntaxElement.GET_RUBY_THREAD_CONTEXT, - plugin, - SyntaxFactory.constant(RubyUtil.class, "LOGSTASH_MODULE"), - SyntaxFactory.value(SyntaxFactory.join("\"", method, "\"")), - argument, - SyntaxFactory.constant(Block.class, "NULL_BLOCK") - ); - } - private static Closure clearSyntax(final Collection toClear) { return Closure.wrap( toClear.stream().map(DatasetCompiler::clear).toArray(MethodLevelSyntaxElement[]::new) ); } - private static DynamicMethod rubyCallsite(final IRubyObject rubyObject, final String name) { - return rubyObject.getMetaClass().searchMethod(name); - } - private static DatasetCompiler.ComputeAndClear computeAndClear(final Closure compute, final Closure clear, final ClassFields fields) { return new DatasetCompiler.ComputeAndClear(compute, clear, fields); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FieldDefinition.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FieldDefinition.java index b0088e737ee..06a3619083d 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FieldDefinition.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FieldDefinition.java @@ -38,21 +38,6 @@ public static FieldDefinition mutableUnassigned(final int index, final Class ); } - /** - * Creates an immutable field that is assigned its value inline in the class body by the given - * syntax element. - * @param index Index for naming - * @param type Type of the field - * @param initializer Initializer syntax - * @return Field definition - */ - public static FieldDefinition withInitializer(final int index, final Class type, - final SyntaxElement initializer) { - return new FieldDefinition( - variableDefinition(type, index), false, initializer, null - ); - } - private FieldDefinition(final VariableDefinition typeDef, final boolean mutable, final SyntaxElement initializer, final Object ctorArgument) { this.def = typeDef; diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java new file mode 100644 index 00000000000..9b04572e1e4 --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/FilterDelegatorExt.java @@ -0,0 +1,166 @@ +package org.logstash.config.ir.compiler; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import org.jruby.Ruby; +import org.jruby.RubyArray; +import org.jruby.RubyClass; +import org.jruby.RubyHash; +import org.jruby.RubyObject; +import org.jruby.RubyString; +import org.jruby.RubySymbol; +import org.jruby.anno.JRubyClass; +import org.jruby.anno.JRubyMethod; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.logstash.RubyUtil; +import org.logstash.ext.JrubyEventExtLibrary; +import org.logstash.instrument.metrics.MetricKeys; +import org.logstash.instrument.metrics.counter.LongCounter; + +@JRubyClass(name = "JavaFilterDelegator") +public final class FilterDelegatorExt extends RubyObject { + + private static final long serialVersionUID = 1L; + + private IRubyObject filterClass; + + private IRubyObject filter; + + private IRubyObject metricEvents; + + private RubyString id; + + private LongCounter eventMetricOut; + + private LongCounter eventMetricIn; + + private LongCounter eventMetricTime; + + private boolean flushes; + + @JRubyMethod(name = "initialize", required = 2) + public IRubyObject init(final ThreadContext context, final IRubyObject filter, final IRubyObject id) { + this.id = (RubyString) id; + this.filter = filter; + this.filterClass = filter.getMetaClass(); + final IRubyObject namespacedMetric = filter.callMethod(context, "metric"); + metricEvents = namespacedMetric.callMethod(context, "namespace", RubyUtil.RUBY.newSymbol("events")); + eventMetricOut = LongCounter.fromRubyBase(metricEvents, MetricKeys.OUT_KEY); + eventMetricIn = LongCounter.fromRubyBase(metricEvents, MetricKeys.IN_KEY); + eventMetricTime = LongCounter.fromRubyBase( + metricEvents, MetricKeys.DURATION_IN_MILLIS_KEY + ); + namespacedMetric.callMethod( + context, "gauge", + new IRubyObject[]{ + RubySymbol.newSymbol(context.runtime, "name"), configName(context) + } + ); + flushes = filter.respondsTo("flush"); + return this; + } + + @VisibleForTesting + public FilterDelegatorExt initForTesting(final IRubyObject filter) { + eventMetricOut = LongCounter.DUMMY_COUNTER; + eventMetricIn = LongCounter.DUMMY_COUNTER; + eventMetricTime = LongCounter.DUMMY_COUNTER; + this.filter = filter; + flushes = filter.respondsTo("flush"); + return this; + } + + public FilterDelegatorExt(final Ruby runtime, final RubyClass metaClass) { + super(runtime, metaClass); + } + + @JRubyMethod + public IRubyObject register(final ThreadContext context) { + return filter.callMethod(context, "register"); + } + + @JRubyMethod + public IRubyObject close(final ThreadContext context) { + return filter.callMethod(context, "close"); + } + + @JRubyMethod(name = "do_close") + public IRubyObject doClose(final ThreadContext context) { + return filter.callMethod(context, "do_close"); + } + + @JRubyMethod(name = "do_stop") + public IRubyObject doStop(final ThreadContext context) { + return filter.callMethod(context, "do_stop"); + } + + @JRubyMethod(name = "reloadable?") + public IRubyObject isReloadable(final ThreadContext context) { + return filter.callMethod(context, "reloadable?"); + } + + @JRubyMethod(name = "threadsafe?") + public IRubyObject concurrency(final ThreadContext context) { + return filter.callMethod(context, "threadsafe?"); + } + + @JRubyMethod(name = "config_name") + public IRubyObject configName(final ThreadContext context) { + return filterClass.callMethod(context, "config_name"); + } + + @JRubyMethod + public IRubyObject id(final ThreadContext context) { + return id; + } + + @JRubyMethod(name = "metric_events") + public IRubyObject metricEvents(final ThreadContext context) { + return metricEvents; + } + + @JRubyMethod + public IRubyObject strategy(final ThreadContext context) { + return filter; + } + + @SuppressWarnings("unchecked") + public RubyArray multiFilter(final ThreadContext context, final RubyArray batch) { + eventMetricIn.increment((long) batch.size()); + final long start = System.nanoTime(); + final RubyArray result = (RubyArray) filter.callMethod(context, "multi_filter", batch); + eventMetricTime.increment( + TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS) + ); + int count = 0; + for (final JrubyEventExtLibrary.RubyEvent event : (Collection) result) { + if (!event.getEvent().isCancelled()) { + ++count; + } + } + eventMetricOut.increment((long) count); + return result; + } + + public RubyArray flush(final ThreadContext context, final RubyHash options) { + final IRubyObject newEvents = filter.callMethod(context, "flush", options); + final RubyArray result; + if (newEvents.isNil()) { + result = RubyArray.newEmptyArray(context.runtime); + } else { + result = (RubyArray) newEvents; + eventMetricOut.increment((long) result.size()); + } + return result; + } + + public boolean hasFlush() { + return flushes; + } + + public boolean periodicFlush() { + return filter.callMethod(RubyUtil.RUBY.getCurrentContext(), "periodic_flush").isTrue(); + } +} diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java index 6aa350f969c..b4417b039a2 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/RubyIntegration.java @@ -13,31 +13,6 @@ private RubyIntegration() { //Utility Class. } - /** - * A Ruby Filter. Currently, this interface is implemented only by the Ruby class - * {@code FilterDelegator}. - */ - public interface Filter { - - /** - * Returns the underlying {@link IRubyObject} for this filter instance. - * @return Underlying {@link IRubyObject} - */ - IRubyObject toRuby(); - - /** - * Checks if this filter has a flush method. - * @return True iff this filter has a flush method - */ - boolean hasFlush(); - - /** - * Checks if this filter does periodic flushing. - * @return True iff this filter uses periodic flushing - */ - boolean periodicFlush(); - } - /** * Plugin Factory that instantiates Ruby plugins and is implemented in Ruby. */ @@ -49,7 +24,7 @@ IRubyObject buildInput(RubyString name, RubyInteger line, RubyInteger column, OutputDelegatorExt buildOutput(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args); - RubyIntegration.Filter buildFilter(RubyString name, RubyInteger line, RubyInteger column, + FilterDelegatorExt buildFilter(RubyString name, RubyInteger line, RubyInteger column, IRubyObject args); IRubyObject buildCodec(RubyString name, IRubyObject args); diff --git a/logstash-core/src/main/java/org/logstash/config/ir/compiler/SyntaxFactory.java b/logstash-core/src/main/java/org/logstash/config/ir/compiler/SyntaxFactory.java index 9d8970a0802..5ddd7f3e43f 100644 --- a/logstash-core/src/main/java/org/logstash/config/ir/compiler/SyntaxFactory.java +++ b/logstash-core/src/main/java/org/logstash/config/ir/compiler/SyntaxFactory.java @@ -36,27 +36,6 @@ public static ValueSyntaxElement constant(final Class clazz, join(clazz.getName(), ".", name)); } - public static ValueSyntaxElement arrayField(final MethodLevelSyntaxElement array, - final int index) { - return new ValueSyntaxElement() { - @Override - public MethodLevelSyntaxElement replace(final MethodLevelSyntaxElement search, - final MethodLevelSyntaxElement replacement) { - return arrayField(array.replace(search, replacement), index); - } - - @Override - public int count(final MethodLevelSyntaxElement search) { - return array.count(search); - } - - @Override - public String generateCode() { - return join(array.generateCode(), String.format("[%d]", index)); - } - }; - } - public static MethodLevelSyntaxElement assignment(final SyntaxElement target, final MethodLevelSyntaxElement value) { return new SyntaxFactory.Assignment(target, value); diff --git a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java index 2679e852b43..e8501e5edbd 100644 --- a/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java +++ b/logstash-core/src/main/java/org/logstash/execution/QueueReadClientBase.java @@ -5,7 +5,6 @@ import org.jruby.RubyHash; import org.jruby.RubyNumeric; import org.jruby.RubyObject; -import org.jruby.RubySymbol; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.java.proxies.JavaProxy; diff --git a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java index d3c7de3747d..77b522a942b 100644 --- a/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java +++ b/logstash-core/src/test/java/org/logstash/config/ir/CompiledPipelineTest.java @@ -18,6 +18,7 @@ import org.junit.Test; import org.logstash.Event; import org.logstash.RubyUtil; +import org.logstash.config.ir.compiler.FilterDelegatorExt; import org.logstash.config.ir.compiler.OutputDelegatorExt; import org.logstash.config.ir.compiler.RubyIntegration; import org.logstash.ext.JrubyEventExtLibrary; @@ -34,6 +35,34 @@ public final class CompiledPipelineTest extends RubyEnvTestCase { public static final Map> EVENT_SINKS = new ConcurrentHashMap<>(); + /** + * Mock filter that does not modify the batch. + */ + private static final IRubyObject IDENTITY_FILTER = RubyUtil.RUBY.evalScriptlet( + String.join( + "\n", + "output = Object.new", + "output.define_singleton_method(:multi_filter) do |batch|", + "batch", + "end", + "output" + ) + ); + + /** + * Mock filter that adds the value 'bar' to the field 'foo' for every event in the batch. + */ + private static final IRubyObject ADD_FIELD_FILTER = RubyUtil.RUBY.evalScriptlet( + String.join( + "\n", + "output = Object.new", + "output.define_singleton_method(:multi_filter) do |batch|", + "batch.each { |e| e.set('foo', 'bar')}", + "end", + "output" + ) + ); + private static final AtomicLong TEST_RUN = new AtomicLong(); /** @@ -84,7 +113,7 @@ public void buildsStraightPipeline() throws Exception { pipelineIR, new CompiledPipelineTest.MockPluginFactory( Collections.singletonMap("mockinput", () -> null), - Collections.singletonMap("mockfilter", CompiledPipelineTest.IdentityFilter::new), + Collections.singletonMap("mockfilter", () -> IDENTITY_FILTER), Collections.singletonMap("mockoutput", mockOutputSupplier()) ) ).buildExecution().compute(RubyUtil.RUBY.newArray(testEvent), false, false); @@ -108,9 +137,9 @@ public void buildsForkedPipeline() throws Exception { ); final JrubyEventExtLibrary.RubyEvent testEvent = JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event()); - final Map> filters = new HashMap<>(); - filters.put("mockfilter", CompiledPipelineTest.IdentityFilter::new); - filters.put("mockaddfilter", CompiledPipelineTest.AddFieldFilter::new); + final Map> filters = new HashMap<>(); + filters.put("mockfilter", () -> IDENTITY_FILTER); + filters.put("mockaddfilter", () -> ADD_FIELD_FILTER); new CompiledPipeline( pipelineIR, new CompiledPipelineTest.MockPluginFactory( @@ -132,9 +161,9 @@ public void conditionalNestedMetaFieldPipeline() throws Exception { ); final JrubyEventExtLibrary.RubyEvent testEvent = JrubyEventExtLibrary.RubyEvent.newRubyEvent(RubyUtil.RUBY, new Event()); - final Map> filters = new HashMap<>(); - filters.put("mockfilter", CompiledPipelineTest.IdentityFilter::new); - filters.put("mockaddfilter", CompiledPipelineTest.AddFieldFilter::new); + final Map> filters = new HashMap<>(); + filters.put("mockfilter", () -> IDENTITY_FILTER); + filters.put("mockaddfilter", () -> ADD_FIELD_FILTER); new CompiledPipeline( pipelineIR, new CompiledPipelineTest.MockPluginFactory( @@ -172,12 +201,12 @@ private static final class MockPluginFactory implements RubyIntegration.PluginFa private final Map> inputs; - private final Map> filters; + private final Map> filters; private final Map> outputs; MockPluginFactory(final Map> inputs, - final Map> filters, + final Map> filters, final Map> outputs) { this.inputs = inputs; this.filters = filters; @@ -199,9 +228,11 @@ public OutputDelegatorExt buildOutput(final RubyString name, final RubyInteger l } @Override - public RubyIntegration.Filter buildFilter(final RubyString name, final RubyInteger line, + public FilterDelegatorExt buildFilter(final RubyString name, final RubyInteger line, final RubyInteger column, final IRubyObject args) { - return setupPlugin(name, filters); + return new FilterDelegatorExt( + RubyUtil.RUBY, RubyUtil.OUTPUT_DELEGATOR_CLASS) + .initForTesting(setupPlugin(name, filters)); } @Override @@ -220,64 +251,4 @@ private static T setupPlugin(final RubyString name, return suppliers.get(name.asJavaString()).get(); } } - - /** - * Mock filter that adds the value 'bar' to the field 'foo' for every event in the batch. - */ - private static final class AddFieldFilter implements RubyIntegration.Filter { - @Override - public IRubyObject toRuby() { - return RubyUtil.RUBY.evalScriptlet( - String.join( - "\n", - "output = Object.new", - "output.define_singleton_method(:multi_filter) do |batch|", - "batch.each { |e| e.set('foo', 'bar')}", - "end", - "output" - ) - ); - } - - @Override - public boolean hasFlush() { - return false; - } - - @Override - public boolean periodicFlush() { - return false; - } - - } - - /** - * Mock filter that does not modify the batch. - */ - private static final class IdentityFilter implements RubyIntegration.Filter { - @Override - public IRubyObject toRuby() { - return RubyUtil.RUBY.evalScriptlet( - String.join( - "\n", - "output = Object.new", - "output.define_singleton_method(:multi_filter) do |batch|", - "batch", - "end", - "output" - ) - ); - } - - @Override - public boolean hasFlush() { - return false; - } - - @Override - public boolean periodicFlush() { - return false; - } - - } }