Skip to content

Commit

Permalink
PERFORMANCE+JAVAFICATION: Java Filter Delegator
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Apr 16, 2018
1 parent 5df5077 commit 4afc94f
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 312 deletions.
74 changes: 0 additions & 74 deletions logstash-core/lib/logstash/java_filter_delegator.rb

This file was deleted.

1 change: 0 additions & 1 deletion logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/collector"
require "logstash/util/dead_letter_queue_manager"
require "logstash/java_filter_delegator"
require "logstash/queue_factory"
require "logstash/compiler"
require "securerandom"
Expand Down
99 changes: 62 additions & 37 deletions logstash-core/spec/logstash/java_filter_delegator_spec.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
# encoding: utf-8
require "spec_helper"
require "logstash/java_filter_delegator"
require "logstash/filter_delegator"
require "logstash/instrument/null_metric"
require "logstash/event"
require "logstash/execution_context"
require "support/shared_contexts"

java_import org.logstash.RubyUtil

describe LogStash::JavaFilterDelegator do

class MockGauge
Expand All @@ -15,23 +17,35 @@ def increment(_)

include_context "execution_context"

let(:filter_id) { "my-filter" }
let(:filter_id) { "my_filter" }
let(:config) do
{ "host" => "127.0.0.1", "id" => filter_id }
end
let(:collector) { [] }
let(:counter_in) { MockGauge.new }
let(:counter_out) { MockGauge.new }
let(:counter_time) { MockGauge.new }
let(:metric) { LogStash::Instrument::NamespacedNullMetric.new(collector, :null) }
let(:metric) {
LogStash::Instrument::NamespacedMetric.new(
LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new), [:filter]
)
}
let(:counter_in) {
counter = metric.counter(:in)
counter.increment(0)
counter
}
let(:counter_out) {
counter = metric.counter(:out)
counter.increment(0)
counter
}
let(:counter_time) {
counter = metric.counter(:duration_in_millis)
counter.increment(0)
counter
}
let(:events) { [LogStash::Event.new, LogStash::Event.new] }

before :each do
allow(pipeline).to receive(:id).and_return(pipeline_id)
allow(metric).to receive(:namespace).with(anything).and_return(metric)
allow(metric).to receive(:counter).with(:in).and_return(counter_in)
allow(metric).to receive(:counter).with(:out).and_return(counter_out)
allow(metric).to receive(:counter).with(:duration_in_millis).and_return(counter_time)
end

let(:plugin_klass) do
Expand Down Expand Up @@ -64,37 +78,48 @@ def filter(event)
end

it "defines a flush method" do
expect(subject.respond_to?(:flush)).to be_truthy
expect(subject.to_java.hasFlush).to be_truthy
end

context "when the flush return events" do
it "increments the out" do
subject.multi_filter([LogStash::Event.new])
expect(counter_out).to receive(:increment).with(1)
subject.flush({})
ruby_context = RubyUtil::RUBY.getCurrentContext
subject.to_java.multiFilter(ruby_context, [LogStash::Event.new])
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
"filter/my_filter"
)[:filter][:my_filter][:events]
expect(event_metrics[:out].value).to eq(0)
subject.to_java.flush(ruby_context, {})
expect(event_metrics[:out].value).to eq(1)
end
end

context "when the flush doesn't return anything" do
it "doesnt increment the out" do
expect(metric).not_to receive(:increment)
subject.flush({})
subject.to_java.flush(RubyUtil::RUBY.getCurrentContext, {})
expect(
metric.collector.snapshot_metric.metric_store.
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:duration_in_millis].value
).to eq(0)
end
end

context "when the filter buffer events" do
before do
allow(metric).to receive(:increment).with(anything, anything)
end

it "has incremented :in" do
expect(counter_in).to receive(:increment).with(events.size)
subject.multi_filter(events)
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
expect(
metric.collector.snapshot_metric.metric_store.
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:in].value
).to eq(events.size)
end

it "has not incremented :out" do
expect(counter_out).not_to receive(:increment).with(anything)
subject.multi_filter(events)
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
expect(
metric.collector.snapshot_metric.metric_store.
get_with_path("filter/my_filter")[:filter][:my_filter][:events][:out].value
).to eq(0)
end
end

Expand All @@ -114,15 +139,13 @@ def filter(event)
end
end

before do
allow(metric).to receive(:increment).with(anything, anything)
end

it "increments the in/out of the metric" do
expect(counter_in).to receive(:increment).with(events.size)
expect(counter_out).to receive(:increment).with(events.size * 2)

subject.multi_filter(events)
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
"filter/my_filter"
)[:filter][:my_filter][:events]
expect(event_metrics[:in].value).to eq(events.size)
expect(event_metrics[:out].value).to eq(events.size * 2)
end
end
end
Expand All @@ -144,14 +167,16 @@ def filter(event)
end

it "doesnt define a flush method" do
expect(subject.has_flush).to be_falsey
expect(subject.to_java.hasFlush).to be_falsey
end

it "increments the in/out of the metric" do
expect(counter_in).to receive(:increment).with(events.size)
expect(counter_out).to receive(:increment).with(events.size)

subject.multi_filter(events)
subject.to_java.multiFilter(RubyUtil::RUBY.getCurrentContext, events)
event_metrics = metric.collector.snapshot_metric.metric_store.get_with_path(
"filter/my_filter"
)[:filter][:my_filter][:events]
expect(event_metrics[:in].value).to eq(events.size)
expect(event_metrics[:out].value).to eq(events.size)
end
end

Expand All @@ -165,7 +190,7 @@ def filter(event)
# I am not testing the behavior of these methods
# this is done in the plugin tests. I just want to make sure
# the proxy delegates the methods.
LogStash::JavaFilterDelegator::DELEGATED_METHODS.each do |method|
LogStash::FilterDelegator::DELEGATED_METHODS.each do |method|
it "delegate method: `#{method}` to the filter" do
expect(subject.respond_to?(method))
end
Expand Down
6 changes: 6 additions & 0 deletions logstash-core/src/main/java/org/logstash/RubyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.jruby.runtime.ObjectAllocator;
import org.logstash.ackedqueue.ext.JRubyAckedQueueExt;
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.execution.QueueReadClientBase;
import org.logstash.ext.JRubyWrappedWriteClientExt;
Expand Down Expand Up @@ -68,6 +69,8 @@ public final class RubyUtil {

public static final RubyClass OUTPUT_DELEGATOR_CLASS;

public static final RubyClass FILTER_DELEGATOR_CLASS;

static {
RUBY = Ruby.getGlobalRuntime();
LOGSTASH_MODULE = RUBY.getOrCreateModule("LogStash");
Expand Down Expand Up @@ -98,6 +101,9 @@ public final class RubyUtil {
OUTPUT_DELEGATOR_CLASS = setupLogstashClass(
OutputDelegatorExt::new, OutputDelegatorExt.class
);
FILTER_DELEGATOR_CLASS = setupLogstashClass(
FilterDelegatorExt::new, FilterDelegatorExt.class
);
final RubyModule json = LOGSTASH_MODULE.defineOrGetModuleUnder("Json");
final RubyClass stdErr = RUBY.getStandardError();
LOGSTASH_ERROR = LOGSTASH_MODULE.defineClassUnder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.logstash.config.ir.compiler.Dataset;
import org.logstash.config.ir.compiler.DatasetCompiler;
import org.logstash.config.ir.compiler.EventCondition;
import org.logstash.config.ir.compiler.FilterDelegatorExt;
import org.logstash.config.ir.compiler.OutputDelegatorExt;
import org.logstash.config.ir.compiler.RubyIntegration;
import org.logstash.config.ir.compiler.SplitDataset;
Expand Down Expand Up @@ -52,7 +53,7 @@ public final class CompiledPipeline {
/**
* Configured Filters, indexed by their ID as returned by {@link PluginVertex#getId()}.
*/
private final Map<String, RubyIntegration.Filter> filters;
private final Map<String, FilterDelegatorExt> filters;

/**
* Configured outputs.
Expand Down Expand Up @@ -82,7 +83,7 @@ public Collection<IRubyObject> outputs() {
return Collections.unmodifiableCollection(outputs.values());
}

public Collection<RubyIntegration.Filter> filters() {
public Collection<FilterDelegatorExt> filters() {
return Collections.unmodifiableCollection(filters.values());
}

Expand Down Expand Up @@ -119,9 +120,9 @@ private Map<String, OutputDelegatorExt> setupOutputs() {
/**
* Sets up all Ruby filters learnt from {@link PipelineIR}.
*/
private Map<String, RubyIntegration.Filter> setupFilters() {
private Map<String, FilterDelegatorExt> setupFilters() {
final Collection<PluginVertex> filterPlugins = pipelineIR.getFilterPluginVertices();
final Map<String, RubyIntegration.Filter> res =
final Map<String, FilterDelegatorExt> res =
new HashMap<>(filterPlugins.size(), 1.0F);
for (final PluginVertex plugin : filterPlugins) {
res.put(plugin.getId(), buildFilter(plugin));
Expand Down Expand Up @@ -174,11 +175,11 @@ private RubyHash convertArgs(final PluginDefinition def) {
}

/**
* Compiles a {@link RubyIntegration.Filter} from a given {@link PluginVertex}.
* Compiles a {@link FilterDelegatorExt} from a given {@link PluginVertex}.
* @param vertex Filter {@link PluginVertex}
* @return Compiled {@link RubyIntegration.Filter}
* @return Compiled {@link FilterDelegatorExt}
*/
private RubyIntegration.Filter buildFilter(final PluginVertex vertex) {
private FilterDelegatorExt buildFilter(final PluginVertex vertex) {
final PluginDefinition def = vertex.getPluginDefinition();
final SourceWithMetadata source = vertex.getSourceWithMetadata();
return pluginFactory.buildFilter(
Expand All @@ -188,9 +189,9 @@ private RubyIntegration.Filter buildFilter(final PluginVertex vertex) {
}

/**
* Checks if a certain {@link Vertex} represents a {@link RubyIntegration.Filter}.
* Checks if a certain {@link Vertex} represents a {@link FilterDelegatorExt}.
* @param vertex Vertex to check
* @return True iff {@link Vertex} represents a {@link RubyIntegration.Filter}
* @return True iff {@link Vertex} represents a {@link FilterDelegatorExt}
*/
private boolean isFilter(final Vertex vertex) {
return filters.containsKey(vertex.getId());
Expand Down
Loading

0 comments on commit 4afc94f

Please sign in to comment.