Skip to content

Commit

Permalink
cleanup partially initialized pipeline
Browse files Browse the repository at this point in the history
DRY the plugin registration

typo

log exception message
  • Loading branch information
colinsurprenant committed Feb 14, 2017
1 parent 47767a7 commit 31693dd
Showing 1 changed file with 43 additions and 12 deletions.
55 changes: 43 additions & 12 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil)
begin
@queue = LogStash::QueueFactory.create(settings)
rescue => e
@logger.error("Logstash failed to create queue", "exception" => e, "backtrace" => e.backtrace)
@logger.error("Logstash failed to create queue", "exception" => e.message, "backtrace" => e.backtrace)
raise e
end

Expand Down Expand Up @@ -250,12 +250,32 @@ def stopped?
@running.false?
end

# register_plugin simply calls the plugin #register method and catches & logs any error
# @param plugin [Plugin] the plugin to register
# @return [Plugin] the registered plugin
def register_plugin(plugin)
plugin.register
plugin
rescue => e
@logger.error("Error registering plugin", :plugin => plugin.inspect, :error => e.message)
raise e
end

# register_plugins calls #register_plugin on the plugins list and upon exception will call Plugin#do_close on all registered plugins
# @param plugins [Array[Plugin]] the list of plugins to register
def register_plugins(plugins)
registered = []
plugins.each { |plugin| registered << register_plugin(plugin) }
rescue => e
registered.each(&:do_close)
raise e
end

def start_workers
@worker_threads.clear # In case we're restarting the pipeline
begin
start_inputs
@outputs.each {|o| o.register }
@filters.each {|f| f.register }
register_plugins(@outputs)
register_plugins(@filters)

pipeline_workers = safe_pipeline_worker_count
batch_size = @settings.get("pipeline.batch.size")
Expand Down Expand Up @@ -286,6 +306,16 @@ def start_workers
worker_loop(batch_size, batch_delay)
end
end

# inputs should be started last, after all workers
begin
start_inputs
rescue => e
# if there is any exception in starting inputs, make sure we shutdown workers.
# exception will already by logged in start_inputs
shutdown_workers
raise e
end
ensure
# it is important to guarantee @ready to be true after the startup sequence has been completed
# to potentially unblock the shutdown method which may be waiting on @ready to proceed
Expand Down Expand Up @@ -335,7 +365,7 @@ def filter_batch(batch)
# Users need to check their configuration or see if there is a bug in the
# plugin.
@logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.",
"exception" => e, "backtrace" => e.backtrace)
"exception" => e.message, "backtrace" => e.backtrace)

raise e
end
Expand Down Expand Up @@ -378,10 +408,11 @@ def start_inputs
end
@inputs += moreinputs

@inputs.each do |input|
input.register
start_input(input)
end
# first make sure we can register all input plugins
register_plugins(@inputs)

# then after all input plugins are sucessfully registered, start them
@inputs.each { |input| start_input(input) }
end

def start_input(plugin)
Expand All @@ -395,20 +426,20 @@ def inputworker(plugin)
rescue => e
if plugin.stop?
@logger.debug("Input plugin raised exception during shutdown, ignoring it.",
:plugin => plugin.class.config_name, :exception => e,
:plugin => plugin.class.config_name, :exception => e.message,
:backtrace => e.backtrace)
return
end

# otherwise, report error and restart
if @logger.debug?
@logger.error(I18n.t("logstash.pipeline.worker-error-debug",
:plugin => plugin.inspect, :error => e.to_s,
:plugin => plugin.inspect, :error => e.message,
:exception => e.class,
:stacktrace => e.backtrace.join("\n")))
else
@logger.error(I18n.t("logstash.pipeline.worker-error",
:plugin => plugin.inspect, :error => e))
:plugin => plugin.inspect, :error => e.message))
end

# Assuming the failure that caused this exception is transient,
Expand Down

0 comments on commit 31693dd

Please sign in to comment.