Skip to content

Commit

Permalink
Merge branch 'master' of github.com:logstash/logstash
Browse files Browse the repository at this point in the history
  • Loading branch information
jordansissel committed Jun 28, 2012
2 parents ee977c6 + e8be7e7 commit 69215d8
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 10 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Contributors:
* Philippe Weber
* Marc Huffnagle (mhuffnagle)
* Oliver Gorwits (ollyg)
* Rashid Khan (rashidkpc)


Note: If you've sent me patches, bug reports, or otherwise contributed to
Expand Down
8 changes: 8 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ GEM
bouncy-castle-java (1.5.0146.1)
bson (1.6.4)
bson (1.6.4-java)
builder (3.0.0)
bunny (0.7.9)
cabin (0.4.4)
json
Expand Down Expand Up @@ -53,6 +54,7 @@ GEM
jruby-openssl (0.7.7)
bouncy-castle-java (>= 1.5.0146.1)
jruby-win32ole (0.8.5)
json (1.6.5)
json (1.6.5-java)
launchy (2.1.0)
addressable (~> 2.2.6)
Expand Down Expand Up @@ -95,6 +97,11 @@ GEM
redis (3.0.1)
rest-client (1.6.7)
mime-types (>= 1.16)
riak-client (1.0.3)
beefcake (~> 0.3.7)
builder (>= 2.1.2)
i18n (>= 0.4.0)
multi_json (~> 1.0)
riemann-client (0.0.6)
beefcake (>= 0.3.5)
mtrc (>= 0.0.4)
Expand Down Expand Up @@ -151,6 +158,7 @@ DEPENDENCIES
pry
rack
redis
riak-client (= 1.0.3)
riemann-client (= 0.0.6)
sass
shoulda
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ PLUGIN_FILES=$(shell git ls-files | egrep '^lib/logstash/(inputs|outputs|filters
GEM_HOME=build/gems
QUIET=@

WGET=$(shell command -v wget)
CURL=$(shell command -v curl)
WGET=$(shell which wget 2>/dev/null)
CURL=$(shell which curl 2>/dev/null)

# OS-specific options
TARCHECK=$(shell tar --help|grep wildcard|wc -l|tr -d ' ')
Expand Down
24 changes: 21 additions & 3 deletions lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def initialize

# flag/config defaults
@verbose = 0
@filterworker_count = 1

@plugins = {}
@plugins_mutex = Mutex.new
Expand Down Expand Up @@ -84,6 +85,14 @@ def options(opts)
@config_string = arg
end # -e

opts.on("-w COUNT", "--filterworkers COUNT", Integer,
"Run COUNT filter workers (default: 1)") do |arg|
@filterworker_count = arg
if @filterworker_count <= 0
raise ArgumentError, "filter worker count must be > 0"
end
end # -w

opts.on("-l", "--log FILE", "Log to a given path. Default is stdout.") do |path|
@logfile = path
end
Expand Down Expand Up @@ -345,7 +354,7 @@ def start_input(input)
private
def start_output(output)
@logger.debug("Starting output", :plugin => output)
queue = LogStash::SizedQueue.new(10)
queue = LogStash::SizedQueue.new(10 * @filterworker_count)
queue.logger = @logger
@output_queue.add_queue(queue)
@output_plugin_queues[output] = queue
Expand Down Expand Up @@ -384,7 +393,7 @@ def run_with_config(config)
raise "Must have both inputs and outputs configured."
end

# NOTE(petef) we should use a SizedQueue here (w/config params for size)
# NOTE(petef) we should have config params for queue size
@filter_queue = LogStash::SizedQueue.new(10)
@filter_queue.logger = @logger
@output_queue = LogStash::MultiQueue.new
Expand All @@ -407,7 +416,16 @@ def run_with_config(config)
filter.prepare_metrics
end
end
1.times do |n|

if @filterworker_count > 1
@filters.each do |filter|
if ! filter.threadsafe?
raise "fail"
end
end
end

@filterworker_count.times do |n|
# TODO(sissel): facter this out into a 'filterworker' that accepts
# 'shutdown'
# Start a filter worker
Expand Down
6 changes: 6 additions & 0 deletions lib/logstash/filters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class LogStash::Filters::Base < LogStash::Plugin
def initialize(params)
super
config_init(params)
@threadsafe = true
end # def initialize

public
Expand All @@ -90,6 +91,11 @@ def execute(event, &block)
end
end # def execute

public
def threadsafe?
@threadsafe
end

# a filter instance should call filter_matched from filter if the event
# matches the filter's conditions (right type, etc)
protected
Expand Down
2 changes: 2 additions & 0 deletions lib/logstash/filters/multiline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
def initialize(config = {})
super

@threadsafe = false

# This filter needs to keep state.
@types = Hash.new { |h,k| h[k] = [] }
@pending = Hash.new
Expand Down
1 change: 1 addition & 0 deletions lib/logstash/filterworker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def run
while !@shutdown_requested && event = @input_queue.pop
if event == LogStash::SHUTDOWN
finished
@input_queue << LogStash::SHUTDOWN # for the next filter thread
return
end

Expand Down
5 changes: 0 additions & 5 deletions lib/logstash/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def main(args)

def run(args)
command = args.shift
p :run => command
commands = {
"-v" => lambda { emit_version(args) },
"-V" => lambda { emit_version(args) },
Expand All @@ -71,13 +70,9 @@ def run(args)
return agent.run(args)
end,
"web" => lambda do
p :web
require "logstash/web/runner"
p :web2
web = LogStash::Web::Runner.new
p :web3
@runners << web
p :web4
return web.run(args)
end,
"test" => lambda do
Expand Down

0 comments on commit 69215d8

Please sign in to comment.