diff --git a/CONTRIBUTORS b/CONTRIBUTORS index e832fa5bb09..e777eff4d74 100644 --- a/CONTRIBUTORS +++ b/CONTRIBUTORS @@ -46,6 +46,7 @@ Contributors: * Philippe Weber * Marc Huffnagle (mhuffnagle) * Oliver Gorwits (ollyg) +* Rashid Khan (rashidkpc) Note: If you've sent me patches, bug reports, or otherwise contributed to diff --git a/Gemfile.lock b/Gemfile.lock index e8da05539a0..033de43de47 100755 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -12,6 +12,7 @@ GEM bouncy-castle-java (1.5.0146.1) bson (1.6.4) bson (1.6.4-java) + builder (3.0.0) bunny (0.7.9) cabin (0.4.4) json @@ -53,6 +54,7 @@ GEM jruby-openssl (0.7.7) bouncy-castle-java (>= 1.5.0146.1) jruby-win32ole (0.8.5) + json (1.6.5) json (1.6.5-java) launchy (2.1.0) addressable (~> 2.2.6) @@ -95,6 +97,11 @@ GEM redis (3.0.1) rest-client (1.6.7) mime-types (>= 1.16) + riak-client (1.0.3) + beefcake (~> 0.3.7) + builder (>= 2.1.2) + i18n (>= 0.4.0) + multi_json (~> 1.0) riemann-client (0.0.6) beefcake (>= 0.3.5) mtrc (>= 0.0.4) @@ -151,6 +158,7 @@ DEPENDENCIES pry rack redis + riak-client (= 1.0.3) riemann-client (= 0.0.6) sass shoulda diff --git a/Makefile b/Makefile index 06d361bd029..3286d66e517 100644 --- a/Makefile +++ b/Makefile @@ -20,8 +20,8 @@ PLUGIN_FILES=$(shell git ls-files | egrep '^lib/logstash/(inputs|outputs|filters GEM_HOME=build/gems QUIET=@ -WGET=$(shell command -v wget) -CURL=$(shell command -v curl) +WGET=$(shell which wget 2>/dev/null) +CURL=$(shell which curl 2>/dev/null) # OS-specific options TARCHECK=$(shell tar --help|grep wildcard|wc -l|tr -d ' ') diff --git a/lib/logstash/agent.rb b/lib/logstash/agent.rb index 8824379ee1c..36a8e138cc7 100644 --- a/lib/logstash/agent.rb +++ b/lib/logstash/agent.rb @@ -41,6 +41,7 @@ def initialize # flag/config defaults @verbose = 0 + @filterworker_count = 1 @plugins = {} @plugins_mutex = Mutex.new @@ -84,6 +85,14 @@ def options(opts) @config_string = arg end # -e + opts.on("-w COUNT", "--filterworkers COUNT", Integer, + "Run COUNT filter workers (default: 1)") do |arg| + @filterworker_count = arg + if @filterworker_count <= 0 + raise ArgumentError, "filter worker count must be > 0" + end + end # -w + opts.on("-l", "--log FILE", "Log to a given path. Default is stdout.") do |path| @logfile = path end @@ -345,7 +354,7 @@ def start_input(input) private def start_output(output) @logger.debug("Starting output", :plugin => output) - queue = LogStash::SizedQueue.new(10) + queue = LogStash::SizedQueue.new(10 * @filterworker_count) queue.logger = @logger @output_queue.add_queue(queue) @output_plugin_queues[output] = queue @@ -384,7 +393,7 @@ def run_with_config(config) raise "Must have both inputs and outputs configured." end - # NOTE(petef) we should use a SizedQueue here (w/config params for size) + # NOTE(petef) we should have config params for queue size @filter_queue = LogStash::SizedQueue.new(10) @filter_queue.logger = @logger @output_queue = LogStash::MultiQueue.new @@ -407,7 +416,16 @@ def run_with_config(config) filter.prepare_metrics end end - 1.times do |n| + + if @filterworker_count > 1 + @filters.each do |filter| + if ! filter.threadsafe? + raise "fail" + end + end + end + + @filterworker_count.times do |n| # TODO(sissel): facter this out into a 'filterworker' that accepts # 'shutdown' # Start a filter worker diff --git a/lib/logstash/filters/base.rb b/lib/logstash/filters/base.rb index 1d08ecfdc85..a021492f7e0 100755 --- a/lib/logstash/filters/base.rb +++ b/lib/logstash/filters/base.rb @@ -66,6 +66,7 @@ class LogStash::Filters::Base < LogStash::Plugin def initialize(params) super config_init(params) + @threadsafe = true end # def initialize public @@ -90,6 +91,11 @@ def execute(event, &block) end end # def execute + public + def threadsafe? + @threadsafe + end + # a filter instance should call filter_matched from filter if the event # matches the filter's conditions (right type, etc) protected diff --git a/lib/logstash/filters/multiline.rb b/lib/logstash/filters/multiline.rb index e26b26c85c8..d337d4b47e4 100644 --- a/lib/logstash/filters/multiline.rb +++ b/lib/logstash/filters/multiline.rb @@ -90,6 +90,8 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base def initialize(config = {}) super + @threadsafe = false + # This filter needs to keep state. @types = Hash.new { |h,k| h[k] = [] } @pending = Hash.new diff --git a/lib/logstash/filterworker.rb b/lib/logstash/filterworker.rb index 30b20f1cbe4..1679be16d4a 100644 --- a/lib/logstash/filterworker.rb +++ b/lib/logstash/filterworker.rb @@ -25,6 +25,7 @@ def run while !@shutdown_requested && event = @input_queue.pop if event == LogStash::SHUTDOWN finished + @input_queue << LogStash::SHUTDOWN # for the next filter thread return end diff --git a/lib/logstash/runner.rb b/lib/logstash/runner.rb index 4ac4c9da16b..54ca7e3603e 100644 --- a/lib/logstash/runner.rb +++ b/lib/logstash/runner.rb @@ -53,7 +53,6 @@ def main(args) def run(args) command = args.shift - p :run => command commands = { "-v" => lambda { emit_version(args) }, "-V" => lambda { emit_version(args) }, @@ -71,13 +70,9 @@ def run(args) return agent.run(args) end, "web" => lambda do - p :web require "logstash/web/runner" - p :web2 web = LogStash::Web::Runner.new - p :web3 @runners << web - p :web4 return web.run(args) end, "test" => lambda do