Skip to content
This repository has been archived by the owner on Nov 4, 2020. It is now read-only.

Commit

Permalink
refactor test tasks, test:plugins now correctly run specs of installe…
Browse files Browse the repository at this point in the history
…d plugins, including local ones

robustness in shutdown handling and other thread safeties
  • Loading branch information
colinsurprenant authored and ph committed Apr 8, 2015
1 parent fb88eef commit 380ea33
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 31 deletions.
75 changes: 52 additions & 23 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,19 @@ def initialize(configstr)
@settings = {
"filter-workers" => 1,
}

@run_mutex = Mutex.new
@ready = false
@started = false
@input_threads = []
end # def initialize

def ready?
return @ready
@run_mutex.synchronize{@ready}
end

def started?
return @started
@run_mutex.synchronize{@started}
end

def configure(setting, value)
Expand All @@ -69,14 +74,14 @@ def filters?
end

def run
@started = true
@input_threads = []
@run_mutex.synchronize{@started = true}

start_inputs
# synchronize @input_threads between run and shutdown
@run_mutex.synchronize{start_inputs}
start_filters if filters?
start_outputs

@ready = true
@run_mutex.synchronize{@ready = true}

@logger.info("Pipeline started")
@logger.terminal("Logstash startup completed")
Expand Down Expand Up @@ -132,7 +137,7 @@ def start_inputs
moreinputs = []
@inputs.each do |input|
if input.threadable && input.threads > 1
(input.threads-1).times do |i|
(input.threads - 1).times do |i|
moreinputs << input.clone
end
end
Expand Down Expand Up @@ -171,7 +176,7 @@ def inputworker(plugin)
begin
plugin.run(@input_to_filter)
rescue LogStash::ShutdownSignal
return
# ignore and quit
rescue => e
if @logger.debug?
@logger.error(I18n.t("logstash.pipeline.worker-error-debug",
Expand All @@ -183,14 +188,23 @@ def inputworker(plugin)
:plugin => plugin.inspect, :error => e))
end
puts e.backtrace if @logger.debug?
plugin.teardown
# input teardown must be synchronized since is can be called concurrently by
# the input worker thread and from the pipeline thread shutdown method.
# this means that input teardown methods must support multiple calls.
@run_mutex.synchronize{plugin.teardown}
sleep 1
retry
end
rescue LogStash::ShutdownSignal
# nothing
ensure
plugin.teardown
begin
# input teardown must be synchronized since is can be called concurrently by
# the input worker thread and from the pipeline thread shutdown method.
# this means that input teardown methods must support multiple calls.
@run_mutex.synchronize{plugin.teardown}
rescue LogStash::ShutdownSignal
# teardown could receive the ShutdownSignal, retry it
retry
end
end # def inputworker

def filterworker
Expand All @@ -201,6 +215,7 @@ def filterworker

case event
when LogStash::Event
# filter_func returns all filtered events, including cancelled ones
filter_func(event).each { |e| @filter_to_output.push(e) unless e.cancelled? }
when LogStash::FlushEvent
# handle filter flushing here so that non threadsafe filters (thus only running one filterworker)
Expand Down Expand Up @@ -240,18 +255,30 @@ def outputworker
def shutdown
@input_threads.each do |thread|
# Interrupt all inputs
@logger.info("Sending shutdown signal to input thread",
:thread => thread)
thread.raise(LogStash::ShutdownSignal)
begin
thread.wakeup # in case it's in blocked IO or sleeping
rescue ThreadError
@logger.info("Sending shutdown signal to input thread", :thread => thread)

# synchronize both ShutdownSignal and teardown below. by synchronizing both
# we will avoid potentially sending a shutdown signal when the inputworker is
# executing the teardown method.
@run_mutex.synchronize do
thread.raise(LogStash::ShutdownSignal)
begin
thread.wakeup # in case it's in blocked IO or sleeping
rescue ThreadError
end
end
end

# Sometimes an input is stuck in a blocking I/O
# so we need to tell it to teardown directly
@inputs.each do |input|
input.teardown
# sometimes an input is stuck in a blocking I/O so we need to tell it to teardown directly
@inputs.each do |input|
begin
# input teardown must be synchronized since is can be called concurrently by
# the input worker thread and from the pipeline thread shutdown method.
# this means that input teardown methods must support multiple calls.
@run_mutex.synchronize{input.teardown}
rescue LogStash::ShutdownSignal
# teardown could receive the ShutdownSignal, retry it
retry
end
end

Expand All @@ -265,8 +292,10 @@ def plugin(plugin_type, name, *args)
return klass.new(*args)
end

# for backward compatibility in devutils for the rspec helpers
# for backward compatibility in devutils for the rspec helpers, this method is not used
# in the pipeline anymore.
def filter(event, &block)
# filter_func returns all filtered events, including cancelled ones
filter_func(event).each { |e| block.call(e) }
end

Expand Down
28 changes: 20 additions & 8 deletions rakelib/test.rake
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,37 @@
# In general this is not a problem, because the most common rspec usage
# is throw the rake task, where rspec sets this himself internally.
##
require "logstash/pluginmanager/util"

namespace "test" do
def run_rspec(*args)
task "setup" do
require "logstash/environment"
LogStash::Environment.bundler_setup!({:without => []})
require "rspec/core/runner"
require "rspec"
RSpec::Core::Runner.run([*args])
end

task "core" do
exit run_rspec(Rake::FileList["spec/**/*_spec.rb"])
desc "run core specs"
task "core" => ["setup"] do
exit(RSpec::Core::Runner.run([Rake::FileList["spec/**/*_spec.rb"]]))
end

task "core-fail-fast" do
exit run_rspec("--fail-fast", Rake::FileList["spec/**/*_spec.rb"])
desc "run core specs in fail-fast mode"
task "core-fail-fast" => ["setup"] do
exit(Spec::Core::Runner.run(["--fail-fast", Rake::FileList["spec/**/*_spec.rb"]]))
end

task "plugins" do
exit run_rspec("--order", "rand", Rake::FileList[File.join(ENV["GEM_HOME"], "gems/logstash-*/spec/{input,filter,codec,output}s/*_spec.rb")])
desc "run all installed plugins specs"
task "plugins" => ["setup"] do
# grab all spec files using the live plugins gem specs. this allows correclty also running the specs
# of a local plugin dir added using the Gemfile :path option. before this, any local plugin spec would
# not be run because they were not under the vendor/bundle/jruby/1.9/gems path
test_files = LogStash::PluginManager.find_plugins_gem_specs.map do |spec|
Rake::FileList[File.join(spec.gem_dir, "spec/{input,filter,codec,output}s/*_spec.rb")]
end.flatten

# "--format=documentation"
exit(RSpec::Core::Runner.run(["--order", "rand", test_files]))
end

task "install-core" => ["bootstrap", "plugin:install-core", "plugin:install-development-dependencies"]
Expand Down

0 comments on commit 380ea33

Please sign in to comment.