Skip to content

Commit

Permalink
JAVAFICATION: Further dry up ruby and java pipeline and move more of …
Browse files Browse the repository at this point in the history
…their logic to Java

Fixes elastic#9687
  • Loading branch information
original-brownbear committed Jun 1, 2018
1 parent 0a72df8 commit 060a8e3
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 275 deletions.
60 changes: 8 additions & 52 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@
java_import org.logstash.config.ir.CompiledPipeline
java_import org.logstash.config.ir.ConfigCompiler

module LogStash; class JavaBasePipeline < LogstashPipeline
module LogStash; class JavaBasePipeline < AbstractPipeline
include LogStash::Util::Loggable

attr_reader :inputs, :filters, :outputs

def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
super pipeline_config, namespaced_metric
@logger = self.logger
@dlq_writer = dlq_writer
super pipeline_config, namespaced_metric, @logger, @queue
@lir_execution = CompiledPipeline.new(
lir,
LogStash::Plugins::PluginFactory.new(
# use NullMetric if called in the BasePipeline context otherwise use the @metric value
lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, metric),
LogStash::Plugins::ExecutionContextFactory.new(agent, self, @dlq_writer),
LogStash::Plugins::ExecutionContextFactory.new(agent, self, dlq_writer),
JavaFilterDelegator
)
)
Expand All @@ -43,10 +42,6 @@ def reloadable?
configured_as_reloadable? && reloadable_plugins?
end

def configured_as_reloadable?
settings.get("pipeline.reloadable")
end

def reloadable_plugins?
non_reloadable_plugins.empty?
end
Expand All @@ -67,26 +62,22 @@ module LogStash; class JavaPipeline < JavaBasePipeline
:worker_threads,
:events_consumed,
:events_filtered,
:reporter,
:started_at,
:thread,
:filter_queue_client,
:input_queue_client,
:queue
:input_queue_client

MAX_INFLIGHT_WARN_THRESHOLD = 10_000

def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
super
@reporter = PipelineReporter.new(@logger, self)
@worker_threads = []

begin
@queue = LogStash::QueueFactory.create(settings)
@queue = LogStash::QueueFactory.create(pipeline_config.settings)
rescue => e
@logger.error("Logstash failed to create queue", default_logging_keys("exception" => e.message, "backtrace" => e.backtrace))
raise e
end
super
@worker_threads = []

@input_queue_client = @queue.write_client
@filter_queue_client = @queue.read_client
Expand Down Expand Up @@ -244,10 +235,6 @@ def stopped?
@running.false?
end

def system?
settings.get_value("pipeline.system")
end

# register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins
# @param plugins [Array[Plugin]] the list of plugins to register
def register_plugins(plugins)
Expand Down Expand Up @@ -280,7 +267,7 @@ def start_workers
config_metric.gauge(:config_reload_automatic, settings.get("config.reload.automatic"))
config_metric.gauge(:config_reload_interval, settings.get("config.reload.interval"))
config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?)
config_metric.gauge(:dead_letter_queue_path, @dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?
config_metric.gauge(:dead_letter_queue_path, dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?


@logger.info("Starting pipeline", default_logging_keys(
Expand Down Expand Up @@ -466,37 +453,6 @@ def stalling_threads_info
.each {|t| t.delete("status") }
end

def collect_dlq_stats
if dlq_enabled?
dlq_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :dlq])
dlq_metric.gauge(:queue_size_in_bytes, @dlq_writer.get_current_queue_size)
end
end

def collect_stats
pipeline_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :queue])
pipeline_metric.gauge(:type, settings.get("queue.type"))
if @queue.is_a?(LogStash::WrappedAckedQueue) && @queue.queue.is_a?(LogStash::AckedQueue)
queue = @queue.queue
dir_path = queue.dir_path
file_store = Files.get_file_store(Paths.get(dir_path))

pipeline_metric.namespace([:capacity]).tap do |n|
n.gauge(:page_capacity_in_bytes, queue.page_capacity)
n.gauge(:max_queue_size_in_bytes, queue.max_size_in_bytes)
n.gauge(:max_unread_events, queue.max_unread_events)
n.gauge(:queue_size_in_bytes, queue.persisted_size_in_bytes)
end
pipeline_metric.namespace([:data]).tap do |n|
n.gauge(:free_space_in_bytes, file_store.get_unallocated_space)
n.gauge(:storage_type, file_store.type)
n.gauge(:path, dir_path)
end

pipeline_metric.gauge(:events, queue.unread_count)
end
end

def clear_pipeline_metrics
# TODO(ph): I think the metric should also proxy that call correctly to the collector
# this will simplify everything since the null metric would simply just do a noop
Expand Down
63 changes: 9 additions & 54 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,25 @@
java_import org.logstash.common.io.DeadLetterQueueWriter
java_import org.logstash.config.ir.ConfigCompiler

module LogStash; class BasePipeline < LogstashPipeline
module LogStash; class BasePipeline < AbstractPipeline
include LogStash::Util::Loggable

attr_reader :inputs, :filters, :outputs

def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
super pipeline_config, namespaced_metric
@logger = self.logger
super pipeline_config, namespaced_metric, @logger, @queue
@mutex = Mutex.new

@inputs = nil
@filters = nil
@outputs = nil
@agent = agent

@dlq_writer = dlq_writer

@plugin_factory = LogStash::Plugins::PluginFactory.new(
# use NullMetric if called in the BasePipeline context otherwise use the @metric value
lir, LogStash::Plugins::PluginMetricFactory.new(pipeline_id, metric),
LogStash::Plugins::ExecutionContextFactory.new(@agent, self, @dlq_writer),
LogStash::Plugins::ExecutionContextFactory.new(@agent, self, dlq_writer),
FilterDelegator
)
grammar = LogStashConfigParser.new
Expand All @@ -64,10 +62,6 @@ def reloadable?
configured_as_reloadable? && reloadable_plugins?
end

def configured_as_reloadable?
settings.get("pipeline.reloadable")
end

def reloadable_plugins?
non_reloadable_plugins.empty?
end
Expand All @@ -93,27 +87,23 @@ module LogStash; class Pipeline < BasePipeline
:worker_threads,
:events_consumed,
:events_filtered,
:reporter,
:started_at,
:thread,
:filter_queue_client,
:input_queue_client,
:queue
:input_queue_client

MAX_INFLIGHT_WARN_THRESHOLD = 10_000

def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
super

@reporter = PipelineReporter.new(@logger, self)
@worker_threads = []

begin
@queue = LogStash::QueueFactory.create(settings)
@queue = LogStash::QueueFactory.create(pipeline_config.settings)
rescue => e
@logger.error("Logstash failed to create queue", default_logging_keys("exception" => e.message, "backtrace" => e.backtrace))
raise e
end
super

@worker_threads = []

@input_queue_client = @queue.write_client
@filter_queue_client = @queue.read_client
Expand Down Expand Up @@ -271,10 +261,6 @@ def stopped?
@running.false?
end

def system?
settings.get_value("pipeline.system")
end

# register_plugin simply calls the plugin #register method and catches & logs any error
# @param plugin [Plugin] the plugin to register
# @return [Plugin] the registered plugin
Expand Down Expand Up @@ -315,7 +301,7 @@ def start_workers
config_metric.gauge(:config_reload_automatic, settings.get("config.reload.automatic"))
config_metric.gauge(:config_reload_interval, settings.get("config.reload.interval"))
config_metric.gauge(:dead_letter_queue_enabled, dlq_enabled?)
config_metric.gauge(:dead_letter_queue_path, @dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?
config_metric.gauge(:dead_letter_queue_path, dlq_writer.get_path.to_absolute_path.to_s) if dlq_enabled?

if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD
@logger.warn("CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})", default_logging_keys)
Expand Down Expand Up @@ -601,37 +587,6 @@ def stalling_threads_info
.each {|t| t.delete("status") }
end

def collect_dlq_stats
if dlq_enabled?
dlq_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :dlq])
dlq_metric.gauge(:queue_size_in_bytes, @dlq_writer.get_current_queue_size)
end
end

def collect_stats
pipeline_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :queue])
pipeline_metric.gauge(:type, settings.get("queue.type"))
if @queue.is_a?(LogStash::WrappedAckedQueue) && @queue.queue.is_a?(LogStash::AckedQueue)
queue = @queue.queue
dir_path = queue.dir_path
file_store = Files.get_file_store(Paths.get(dir_path))

pipeline_metric.namespace([:capacity]).tap do |n|
n.gauge(:page_capacity_in_bytes, queue.page_capacity)
n.gauge(:max_queue_size_in_bytes, queue.max_size_in_bytes)
n.gauge(:max_unread_events, queue.max_unread_events)
n.gauge(:queue_size_in_bytes, queue.persisted_size_in_bytes)
end
pipeline_metric.namespace([:data]).tap do |n|
n.gauge(:free_space_in_bytes, file_store.get_unallocated_space)
n.gauge(:storage_type, file_store.type)
n.gauge(:path, dir_path)
end

pipeline_metric.gauge(:events, queue.unread_count)
end
end

def clear_pipeline_metrics
# TODO(ph): I think the metric should also proxy that call correctly to the collector
# this will simplify everything since the null metric would simply just do a noop
Expand Down
4 changes: 2 additions & 2 deletions logstash-core/src/main/java/org/logstash/RubyUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.execution.EventDispatcherExt;
import org.logstash.execution.ExecutionContextExt;
import org.logstash.execution.LogstashPipelineExt;
import org.logstash.execution.AbstractPipelineExt;
import org.logstash.execution.PipelineReporterExt;
import org.logstash.execution.QueueReadClientBase;
import org.logstash.execution.ShutdownWatcherExt;
Expand Down Expand Up @@ -390,7 +390,7 @@ public final class RubyUtil {
LOGGABLE_MODULE = UTIL_MODULE.defineModuleUnder("Loggable");
LOGGABLE_MODULE.defineAnnotatedMethods(LoggableExt.class);
LOGSTASH_PIPELINE_CLASS =
setupLogstashClass(LogstashPipelineExt::new, LogstashPipelineExt.class);
setupLogstashClass(AbstractPipelineExt::new, AbstractPipelineExt.class);
final RubyModule json = LOGSTASH_MODULE.defineOrGetModuleUnder("Json");
final RubyClass stdErr = RUBY.getStandardError();
LOGSTASH_ERROR = LOGSTASH_MODULE.defineClassUnder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyObject;
import org.jruby.RubyString;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaObject;
Expand Down Expand Up @@ -68,7 +69,7 @@ public IRubyObject ruby_page_capacity(ThreadContext context) {
}

@JRubyMethod(name = "dir_path")
public IRubyObject ruby_dir_path(ThreadContext context) {
public RubyString ruby_dir_path(ThreadContext context) {
return context.runtime.newString(queue.getDirPath());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public JRubyWrappedAckedQueueExt(final Ruby runtime, final RubyClass metaClass)
}

@JRubyMethod(name = "queue")
public IRubyObject rubyGetQueue(ThreadContext context) {
public JRubyAckedQueueExt rubyGetQueue() {
return queue;
}

Expand Down
Loading

0 comments on commit 060a8e3

Please sign in to comment.