Skip to content

Commit

Permalink
Merge branch 'LOGSTASH-238' of github.com:logstash/logstash
Browse files Browse the repository at this point in the history
Conflicts:
	Gemfile
	Gemfile.lock
  • Loading branch information
jordansissel committed Oct 16, 2011
2 parents 85f0222 + ca316b0 commit 52a3f5c
Show file tree
Hide file tree
Showing 23 changed files with 190 additions and 252 deletions.
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ def jruby?
return RUBY_ENGINE == "jruby"
end

gem "cabin" # for logging, Apache 2 license
gem "cabin", "0.1.3" # for logging. apache 2 license
gem "bunny" # for amqp support, MIT-style license
gem "uuidtools" # for naming amqp queues, License ???
gem "filewatch", "~> 0.3.0" # for file tailing, BSD License
Expand Down
19 changes: 11 additions & 8 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ GEM
bouncy-castle-java (1.5.0146.1)
bson (1.4.0)
bson (1.4.0-java)
bunny (0.7.6)
bunny (0.7.8)
cabin (0.1.3)
json
filewatch (0.3.0)
Expand All @@ -15,7 +15,7 @@ GEM
gmetric (0.1.3)
haml (3.1.3)
jls-grok (0.9.0)
jruby-elasticsearch (0.0.11)
jruby-elasticsearch (0.0.12)
jruby-openssl (0.7.4)
bouncy-castle-java
json (1.6.1)
Expand All @@ -25,13 +25,16 @@ GEM
rack (>= 1.0.0)
mongo (1.4.0)
bson (= 1.4.0)
rack (1.3.3)
rack (1.3.4)
rack-protection (1.1.4)
rack
rake (0.9.2)
redis (2.2.2)
sass (3.1.7)
sinatra (1.2.6)
rack (~> 1.1)
tilt (>= 1.2.2, < 2.0)
sass (3.1.10)
sinatra (1.3.1)
rack (~> 1.3, >= 1.3.4)
rack-protection (~> 1.1, >= 1.1.2)
tilt (~> 1.3, >= 1.3.3)
statsd-ruby (0.3.0)
stomp (1.1.9)
tilt (1.3.3)
Expand All @@ -45,7 +48,7 @@ PLATFORMS
DEPENDENCIES
awesome_print
bunny
cabin
cabin (= 0.1.3)
filewatch (~> 0.3.0)
gelf
gelfd (~> 0.1.0)
Expand Down
89 changes: 45 additions & 44 deletions lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def parse_options(args)
# These are 'unknown' flags that begin --<plugin>-flag
# Put any plugin paths into the ruby library path for requiring later.
@plugin_paths.each do |p|
@logger.debug("Adding #{p.inspect} to ruby load path")
@logger.debug("Adding to ruby load path", :path => p)
$:.unshift p
end

Expand All @@ -158,16 +158,17 @@ def parse_options(args)
%w{inputs outputs filters}.each do |component|
@plugin_paths.each do |path|
plugin = File.join(path, component, name) + ".rb"
@logger.debug("Flag #{arg} found; trying to load #{plugin}")
@logger.debug("Plugin flag found; trying to load it",
:flag => arg, :plugin => plugin)
if File.file?(plugin)
@logger.info("Loading plugin #{plugin}")
@logger.info("Loading plugin", :plugin => plugin)
require plugin
[LogStash::Inputs, LogStash::Filters, LogStash::Outputs].each do |c|
# If we get flag --foo-bar, check for LogStash::Inputs::Foo
# and add any options to our option parser.
klass_name = name.capitalize
if c.const_defined?(klass_name)
@logger.debug("Found plugin class #{c}::#{klass_name})")
@logger.debug("Found plugin class", :class => "#{c}::#{klass_name})")
klass = c.const_get(klass_name)
# See LogStash::Config::Mixin::DSL#options
klass.options(@opts)
Expand All @@ -187,7 +188,7 @@ def parse_options(args)
begin
remainder = @opts.parse(args)
rescue OptionParser::InvalidOption => e
@logger.info e
@logger.info("Invalid option", :exception => e)
raise e
end

Expand All @@ -197,22 +198,22 @@ def parse_options(args)
private
def configure
if @config_path && @config_string
@logger.fatal "Can't use -f and -e at the same time"
@logger.fatal("Can't use -f and -e at the same time")
raise "Configuration problem"
elsif (@config_path.nil? || @config_path.empty?) && @config_string.nil?
@logger.fatal "No config file given. (missing -f or --config flag?)"
@logger.fatal @opts.help
@logger.fatal("No config file given. (missing -f or --config flag?)")
@logger.fatal(@opts.help)
raise "Configuration problem"
end

#if @config_path and !File.exist?(@config_path)
if @config_path and Dir.glob(@config_path).length == 0
@logger.fatal "Config file '#{@config_path}' does not exist."
@logger.fatal("Config file does not exist.", :path => @config_path)
raise "Configuration problem"
end

if @daemonize
@logger.fatal "Can't daemonize, no support yet in JRuby."
@logger.fatal("Can't daemonize, no support yet in JRuby.")
raise "Can't daemonize, no fork in JRuby."
end

Expand All @@ -227,14 +228,14 @@ def configure
end

if @verbose >= 3 # Uber debugging.
@logger.level = Logger::DEBUG
@logger.level = :debug
$DEBUG = true
elsif @verbose == 2 # logstash debug logs
@logger.level = Logger::DEBUG
@logger.level = :debug
elsif @verbose == 1 # logstash info logs
@logger.level = Logger::INFO
@logger.level = :info
else # Default log level
@logger.level = Logger::WARN
@logger.level = :warn
end
end # def configure

Expand All @@ -243,7 +244,8 @@ def read_config
# Support directory of config files.
# https://logstash.jira.com/browse/LOGSTASH-106
if File.directory?(@config_path)
@logger.debug("Loading '#{@config_path}' as directory")
@logger.debug("Config path is a directory, scanning files",
:path => @config_path)
paths = Dir.glob(File.join(@config_path, "*")).sort
else
# Get a list of files matching a glob. If the user specified a single
Expand Down Expand Up @@ -330,7 +332,7 @@ def wait

private
def start_input(input)
@logger.debug(["Starting input", input])
@logger.debug("Starting input", :plugin => input)
# inputs should write directly to output queue if there are no filters.
input_target = @filters.length > 0 ? @filter_queue : @output_queue
@plugins[input] = Thread.new(input, input_target) do |*args|
Expand All @@ -340,7 +342,7 @@ def start_input(input)

private
def start_output(output)
@logger.debug(["Starting output", output])
@logger.debug("Starting output", :plugin => output)
queue = SizedQueue.new(10)
@output_queue.add_queue(queue)
@output_plugin_queues[output] = queue
Expand Down Expand Up @@ -462,9 +464,9 @@ def shutdown_plugins(plugins)

finished_queue = Queue.new
# Tell everything to shutdown.
@logger.debug(plugins.keys.collect(&:to_s))
@logger.debug("Plugins to shutdown", :plugins => plugins.keys.collect(&:to_s))
plugins.each do |p, thread|
@logger.debug("Telling to shutdown: #{p.to_s}")
@logger.debug("Sending shutdown to: #{p.to_s}", :plugin => p)
p.shutdown(finished_queue)
end

Expand All @@ -474,7 +476,7 @@ def shutdown_plugins(plugins)
while remaining.size > 0
if (Time.now > force_shutdown_time)
@logger.warn("Time to quit, even if some plugins aren't finished yet.")
@logger.warn("Stuck plugins? #{remaining.map(&:first).join(", ")}")
@logger.warn("Stuck plugins?", :remaining => remaining.map(&:first))
break
end

Expand All @@ -485,9 +487,9 @@ def shutdown_plugins(plugins)
sleep(1)
else
remaining = plugins.select { |p, thread| plugin.running? }
@logger.debug("#{p.to_s} finished, waiting on " \
"#{remaining.size} plugins; " \
"#{remaining.map(&:first).join(", ")}")
@logger.debug("Plugin #{p.to_s} finished, waiting on the rest.",
:count => remaining.size,
:remaining => remaining.map(&:first))
end
end # while remaining.size > 0
end
Expand All @@ -506,7 +508,7 @@ def reload
config = read_config
reloaded_inputs, reloaded_filters, reloaded_outputs = parse_config(config)
rescue Exception => e
@logger.error "Aborting reload due to bad configuration: #{e}"
@logger.error("Aborting reload due to bad configuration", :exception => e)
return
end

Expand All @@ -526,7 +528,7 @@ def reload
obsolete_plugins[p] = @plugins[p]
@plugins.delete(p)
else
@logger.warn("Couldn't find input plugin to stop: #{p}")
@logger.warn("Couldn't find input plugin to stop", :plugin => p)
end
end

Expand All @@ -536,7 +538,7 @@ def reload
@plugins.delete(p)
@output_queue.remove_queue(@output_plugin_queues[p])
else
@logger.warn("Couldn't find output plugin to stop: #{p}")
@logger.warn("Couldn't find output plugin to stop", :plugin => p)
end
end

Expand All @@ -548,15 +550,15 @@ def reload
deleted_filters.each {|f| obsolete_plugins[f] = nil}

if obsolete_plugins.size > 0
@logger.info("Stopping removed plugins:\n\t" + obsolete_plugins.keys.join("\n\t"))
@logger.info("Stopping removed plugins:", :plugins => obsolete_plugins.keys)
shutdown_plugins(obsolete_plugins)
end
# require 'pry'; binding.pry()

# Start up filters
if new_filters.size > 0 || deleted_filters.size > 0
if new_filters.size > 0
@logger.info("Starting new filters: #{new_filters.join(', ')}")
@logger.info("Starting new filters", :plugins => new_filters)
new_filters.each do |f|
f.logger = @logger
f.register
Expand All @@ -569,13 +571,13 @@ def reload
end

if new_inputs.size > 0
@logger.info("Starting new inputs: #{new_inputs.join(', ')}")
@logger.info("Starting new inputs", :plugins => new_inputs)
new_inputs.each do |p|
start_input(p)
end
end
if new_outputs.size > 0
@logger.info("Starting new outputs: #{new_outputs.join(', ')}")
@logger.info("Starting new outputs", :plugins => new_outputs)
new_inputs.each do |p|
start_output(p)
end
Expand Down Expand Up @@ -637,7 +639,7 @@ def run_input(input, queue)
LogStash::Util::set_thread_name("input|#{input.to_s}")
input.logger = @logger
input.register
@logger.info("Input #{input.to_s} registered")
@logger.info("Input registered", :plugin => input)
@ready_queue << input
done = false

Expand All @@ -646,12 +648,11 @@ def run_input(input, queue)
input.run(queue)
done = true
rescue => e
@logger.warn(["Input #{input.to_s} thread exception", e])
@logger.debug(["Input #{input.to_s} thread exception backtrace",
e.backtrace])
@logger.error("Restarting input #{input.to_s} due to exception")
@logger.warn("Input thread exception", :plugin => input,
:exception => e, :backtrace => e.backtrace)
@logger.error("Restarting input due to exception", :plugin => input)
sleep(1)
retry # This jumps to the top of this proc (to the start of 'do')
retry # This jumps to the top of the 'begin'
end
end

Expand All @@ -668,7 +669,7 @@ def run_input(input, queue)
def run_filter(filterworker, index, output_queue)
LogStash::Util::set_thread_name("filter|worker|#{index}")
filterworker.run
@logger.warn("Filter worker ##{index} shutting down")
@logger.warn("Filter worker shutting down", :index => index)

# If we get here, the plugin finished, check if we need to shutdown.
shutdown_if_none_running(LogStash::FilterWorker, output_queue) unless @reloading
Expand All @@ -679,26 +680,25 @@ def run_output(output, queue)
LogStash::Util::set_thread_name("output|#{output.to_s}")
output.logger = @logger
output.register
@logger.info("Output #{output.to_s} registered")
@logger.info("Output registered", :plugin => output)
@ready_queue << output

# TODO(sissel): We need a 'reset' or 'restart' method to call on errors

begin
while event = queue.pop do
@logger.debug("Sending event to #{output.to_s}")
@logger.debug("Sending event", :target => output)
output.handle(event)
end
rescue Exception => e
@logger.warn(["Output #{output.to_s} thread exception", e])
@logger.debug(["Output #{output.to_s} thread exception backtrace",
e.backtrace])
@logger.warn("Output thread exception", :plugin => plugin,
:exception => e, :backtrace => e.backtrace)
# TODO(sissel): should we abort after too many failures?
sleep(1)
retry
end # begin/rescue

@logger.warn("Output #{input.to_s} shutting down")
@logger.warn("Output shutting down", :plugin => output)

# If we get here, the plugin finished, check if we need to shutdown.
shutdown_if_none_running(LogStash::Outputs::Base) unless @reloading
Expand All @@ -714,7 +714,8 @@ def shutdown_if_none_running(pluginclass, queue=nil)
remaining = @plugins.count do |plugin, thread|
plugin.is_a?(pluginclass) and plugin.running?
end
@logger.debug("#{pluginclass} still running: #{remaining}")
@logger.debug("Plugins still running", :type => pluginclass,
:remaining => remaining)

if remaining == 0
@logger.debug("All #{pluginclass} finished. Shutting down.")
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/config/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class LogStash::Config::File
def initialize(path=nil, string=nil)
@path = path
@string = string
@logger = Logger.new(STDERR)
@logger = LogStash::Logger.new(STDERR)

if (path.nil? and string.nil?) or (!path.nil? and !string.nil?)
raise "Must give path or string, not both or neither"
Expand Down
4 changes: 2 additions & 2 deletions lib/logstash/config/mixin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def config_init(params)
# Validation will modify the values inside params if necessary.
# For example: converting a string to a number, etc.
if !self.class.validate(params)
@logger.error "Config validation failed."
@logger.error("Config validation failed.")
exit 1
end

Expand All @@ -53,7 +53,7 @@ def config_init(params)
opts = self.class.get_config[name]
if opts && opts[:deprecated]
@logger.warn("Deprecated config item #{name.inspect} set " +
"in #{self.class.name}")
"in #{self.class.name}", :name => name, :plugin => self)
end
end

Expand Down
5 changes: 3 additions & 2 deletions lib/logstash/filters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ def filter_matched(event)
(@add_field or {}).each do |field, value|
event[field] ||= []
event[field] << event.sprintf(value)
@logger.debug("filters/#{self.class.name}: adding #{value} to field #{field}")
@logger.debug("filters/#{self.class.name}: adding value to field",
:field => field, :value => value)
end

(@add_tag or []).each do |tag|
@logger.debug("filters/#{self.class.name}: adding tag #{tag}")
@logger.debug("filters/#{self.class.name}: adding tag", :tag => tag)
event.tags << event.sprintf(tag)
#event.tags |= [ event.sprintf(tag) ]
end
Expand Down
Loading

0 comments on commit 52a3f5c

Please sign in to comment.