Skip to content
This repository has been archived by the owner on May 2, 2021. It is now read-only.

Commit

Permalink
* logstash -v sets debug loglevel, better passing of logger
Browse files Browse the repository at this point in the history
* fix grok filters to keep one set of @@grokpiles per event type
* use a LogStash::File::Manager for watching globs and missing files
  • Loading branch information
fetep committed Feb 21, 2011
1 parent f0fb7d5 commit f65d21b
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 130 deletions.
7 changes: 5 additions & 2 deletions bin/logstash
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ opts = OptionParser.new do |opts|
settings.logfile = path
end

opts.on("-v", "Increase verbosity (not yet used)") do
opts.on("-v", "Increase verbosity") do
settings.verbose += 1
end
end
Expand Down Expand Up @@ -79,4 +79,7 @@ elsif settings.daemonize
end

agent = LogStash::Agent.new(settings)
agent.run
if settings.verbose > 0
agent.logger.level = Logger::DEBUG
end
agent.run
15 changes: 14 additions & 1 deletion etc/agent.conf
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
input {
file {
path => [ "/var/log/messages", "/var/log/kern.log" ]
path => [ "/var/log/messages", "/var/log/*.log" ]
type => "linux-syslog"
}
}

filter {
grok {
type => "linux-syslog"
pattern => ["%{SYSLOG_SUDO}", "%{SYSLOG_KERNEL}", "%{SYSLOGLINE}"]
}
}

output {
stdout {
debug => true
}
}
83 changes: 34 additions & 49 deletions lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class LogStash::Agent
attr_reader :inputs
attr_reader :outputs
attr_reader :filters
attr_accessor :logger

public
def initialize(settings)
Expand All @@ -37,6 +38,8 @@ def log_to(target)

public
def run
JThread.currentThread().setName("agent")

# Load the config file
config = LogStash::Config::File.new(@settings.config_file)
config.parse do |plugin|
Expand All @@ -50,6 +53,7 @@ def run
# Create a new instance of a plugin, called like:
# -> LogStash::Inputs::File.new( params )
instance = klass.new(plugin[:parameters])
instance.logger = @logger

case type
when "input"
Expand All @@ -64,55 +68,31 @@ def run
end # case type
end # config.parse

exit 0

if @config["inputs"].length == 0 or @config["outputs"].length == 0
if @inputs.length == 0 or @outputs.length == 0
raise "Must have both inputs and outputs configured."
end

# XXX we should use a SizedQueue here (w/config params for size)
# NOTE(petef) we should use a SizedQueue here (w/config params for size)
filter_queue = Queue.new
output_queue = MultiQueue.new

# Register input and output stuff
input_configs = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } }
@config["inputs"].each do |url_type, urls|
# url could be a string or an array.
urls = [urls] if !urls.is_a?(Array)
urls.each do |url_str|
url = URI.parse(url_str)
input_type = url.scheme
input_configs[input_type][url_type] = url
end
end # each input

input_configs.each do |input_type, config|
if @config.include?("filters")
queue = filter_queue
else
queue = output_queue
end
input = LogStash::Inputs.from_name(input_type, config, queue)
@threads["input/#{input_type}"] = Thread.new do
JThread.currentThread().setName("input/#{input_type}")
input.run
end
queue = @filters.length > 0 ? filter_queue : output_queue
# Start inputs
@inputs.each do |input|
@logger.info(["Starting input", input])
input.logger = @logger
@threads[input] = Thread.new { input.run(queue) }
end

# Create N filter-worker threads
if @config.include?("filters")
if @filters.length > 0
3.times do |n|
@threads["worker/filter/#{n}"] = Thread.new do
JThread.currentThread().setName("worker/filter/#{n}")
filters = []

@config["filters"].collect { |x| x.to_a[0] }.each do |filter_config|
name, value = filter_config
@logger.info("Using filter #{name} => #{value.inspect}")
filter = LogStash::Filters.from_name(name, value)
@logger.info("Starting filter worker thread #{n}")
@threads["filter|worker|#{n}"] = Thread.new do
JThread.currentThread().setName("filter|worker|#{n}")
@filters.each do |filter|
filter.logger = @logger
filter.register
filters << filter
end

while event = filter_queue.pop
Expand All @@ -129,26 +109,31 @@ def run

output_queue.push(event) unless event.cancelled?
end # event pop
end # Thread
end # Thread.new
end # N.times
end # if @config.include?("filters")
end # if @filters.length > 0


# Create output threads
@config["outputs"].each do |url|
@outputs.each do |output|
queue = Queue.new
@threads["outputs/#{url}"] = Thread.new do
JThread.currentThread().setName("output:#{url}")
output = LogStash::Outputs.from_url(url)
while event = queue.pop
output_queue.add_queue(queue)

@threads["outputs/#{output}"] = Thread.new do
JThread.currentThread().setName("output/#{output}")
output.register
output.logger = @logger

while event = queue.pop do
output.receive(event)
end
end # Thread
output_queue.add_queue(queue)
end
end # Thread.new
end # @outputs.each

# Register any signal handlers
#register_signal_handler

# # Register any signal handlers
# #register_signal_handler
#
while sleep 5
end
end # def register
Expand Down
88 changes: 88 additions & 0 deletions lib/logstash/file/manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
require "file/tail"
require "logstash/namespace"
require "set"
require "socket" # for Socket.gethostname

class LogStash::File::Manager
attr_accessor :logger

public
def initialize(output_queue)
@watching = Hash.new
@watching_lock = Mutex.new
@file_threads = {}
@main_thread = nil
@output_queue = nil
@logger = Logger.new(STDOUT)
@hostname = Socket.gethostname
end # def initialize

public
def run(queue)
@output_queue = queue
@main_thread ||= Thread.new { watcher }
end

public
def watch(paths, config)
@watching_lock.synchronize do
paths.each do |p|
if @watching[p]
raise ValueError, "cannot watch the same path #{p} more than once"
end

@watching[p] = config
end
end
end

private
def watcher
JThread.currentThread().setName("filemanager")
begin
while true
@logger.info("top")
@watching.each do |path, config|
next if @file_threads[path]

files = Dir.glob(path)
files.each do |g_path|
next if @file_threads[g_path]
@file_threads[g_path] = Thread.new { file_watch(g_path, config) }
end
end

sleep(@file_threads.length > 0 ? 30 : 5)
end
rescue Exception => e
@logger.warn(["Exception in #{self.class} thread, retrying", e])
retry
end
end

private
def file_watch(path, config)
JThread.currentThread().setName("input|file|file:#{path}")
@logger.debug(["watching file", {:path => path}])

config["tag"] ||= []
if !config["tag"].member?(config["type"])
config["tag"] << config["type"]
end

File.open(path, "r") do |f|
f.extend(File::Tail)
f.interval = 5
f.backward(0)
f.tail do |line|
e = LogStash::Event.new({
"@message" => line,
"@type" => config["type"],
"@tags" => config["tag"],
})
e.source = "file://#{@hostname}/#{path}"
@output_queue << e.dup
end # f.tail
end # File.open
end
end # class LogStash::File::Manager
51 changes: 24 additions & 27 deletions lib/logstash/filters/grok.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,33 @@ class LogStash::Filters::Grok < LogStash::Filters::Base
config :pattern => nil
config :patterns_dir => nil
config :drop_if_match => :boolean # googlecode/issue/26


@@grokpiles = Hash.new { |h, k| h[k] = [] }
@@grokpiles_lock = Mutex.new

public
def initialize(params)
super

@grokpiles = {}
end # def initialize

public
def register
# TODO(sissel): Make patterns files come from the config
@config.each do |type, typeconfig|
@logger.debug("Registering type with grok: #{type}")
pile = Grok::Pile.new
patterndir = "#{File.dirname(__FILE__)}/../../../patterns/*"
Dir.glob(patterndir).each do |path|
pile.add_patterns_from_file(path)
end
typeconfig["patterns"].each do |pattern|
groks = pile.compile(pattern)
@logger.debug(["Compiled pattern", pattern, groks[-1].expanded_pattern])
end
@grokpiles[type] = pile
end # @config.each
# TODO(2.0): support grok pattern discovery
@patterns_dir ||= "#{File.dirname(__FILE__)}/../../../patterns/*"
@pile = Grok::Pile.new
Dir.glob(@patterns_dir).each do |path|
@pile.add_patterns_from_file(path)
end

# TODO(petef, sissel): should not need .flatten here
@pattern.flatten.each do |pattern|
groks = @pile.compile(pattern)
@logger.debug(["Compiled pattern", pattern, groks[-1].expanded_pattern])
end

@@grokpiles_lock.synchronize do
@@grokpiles[@type] << @pile
end
end # def register

public
Expand All @@ -42,16 +45,10 @@ def filter(event)
message = event.message
match = false

if event.type
if @grokpiles.include?(event.type)
@logger.debug(["Running grok filter", event])
pile = @grokpiles[event.type]
grok, match = pile.match(message)
end # @grokpiles.include?(event.type)
# TODO(2.0): support grok pattern discovery
else
@logger.info("Unknown type for #{event.source} (type: #{event.type})")
@logger.debug(event.to_hash)
@logger.debug(["Running grok filter", event])
@@grokpiles[event.type].each do |pile|
grok, match = @pile.match(message)
break if match
end

if match
Expand Down
Loading

0 comments on commit f65d21b

Please sign in to comment.