Skip to content

Commit

Permalink
allow inputs to specify a data format
Browse files Browse the repository at this point in the history
  • Loading branch information
fetep committed May 21, 2011
1 parent cbae31f commit 569af86
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 106 deletions.
19 changes: 9 additions & 10 deletions lib/logstash/file/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class LogStash::File::Manager
def initialize(output_queue)
@tail = FileWatch::TailGlob.new
@watching = Hash.new
@to_event = Hash.new
@watching_lock = Mutex.new
@file_threads = {}
@main_thread = nil
Expand All @@ -36,11 +37,11 @@ def run(queue)
end

public
def watch(paths, config)
def watch(paths, config, to_event)
@watching_lock.synchronize do
paths.each do |path|
if @watching[path]
raise ValueError, "cannot watch the same path #{path} more than once"
raise "cannot watch the same path #{path} more than once"
end
@logger.debug(["watching file", {:path => path, :config => config}])

Expand All @@ -61,6 +62,7 @@ def watch(paths, config)
@tail.tail(path, tailconf) do |fullpath|
@logger.info("New file found: #{fullpath}")
@watching[fullpath] = config
@to_event[fullpath] = to_event
end
# TODO(sissel): Make FileWatch emit real exceptions
rescue RuntimeError
Expand All @@ -83,16 +85,13 @@ def watcher
# Maybe extend @tail.tail to accept a extra args that it will
# pass to subscribe's callback?
config = @watching[path]
to_event = @to_event[path]
@logger.debug(["Event from tail", { :path => path, :config => config }])
@buffers[path].extract(data).each do |line|
e = LogStash::Event.new({
"@message" => line,
"@type" => config["type"],
"@tags" => config["tag"].dup,
})
e.source = "file://#{@hostname}#{path}"
@logger.debug(["New event from file input", path, e])
@output_queue << e
e = to_event.call(line, "file://#{@hostname}#{path}")
if e
@output_queue << e
end
end
end
rescue Exception => e
Expand Down
23 changes: 14 additions & 9 deletions lib/logstash/inputs/amqp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
def initialize(params)
super

@format ||= ["json_event"]

if !MQTYPES.include?(@exchange_type)
raise "Invalid type '#{@exchange_type}' must be one of #{MQTYPES.join(", ")}"
end
Expand All @@ -50,14 +52,21 @@ def initialize(params)
def register
@logger.info("Registering input #{@url}")
require "bunny" # rubygem 'bunny'
@vhost ||= "/"
@port ||= 5672
@amqpsettings = {
:vhost => (@vhost or "/"),
:vhost => @vhost,
:host => @host,
:port => (@port or 5672),
:port => @port,
}
@amqpsettings[:user] = @user if @user
@amqpsettings[:pass] = @password.value if @password
@amqpsettings[:logging] = @debug
@amqpurl = "amqp://"
if @user or @password
@amqpurl += "#{@user}:#{@password}@"
end
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@name}"
end # def register

def run(queue)
Expand All @@ -73,14 +82,10 @@ def run(queue)
@queue.bind(exchange)

@queue.subscribe do |data|
begin
obj = JSON.parse(data[:payload])
rescue => e
@logger.error(["json parse error", { :exception => e }])
raise e
e = to_event(data[:payload], @amqpurl)
if e
queue << e
end

queue << LogStash::Event.new(obj)
end # @queue.subscribe
rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
Expand Down
53 changes: 53 additions & 0 deletions lib/logstash/inputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ class LogStash::Inputs::Base < LogStash::Plugin
# Set this to true to enable debugging on an input.
config :debug, :validate => :boolean, :default => false

# The format of input data (plain, json, json_event)
config :format, :validate => (lambda do |value|
valid_formats = ["plain", "json", "json_event"]
if value.length != 1
false
else
valid_formats.member?(value.first)
end
end) # config :format

# Add any number of arbitrary tags to your event.
#
# This can help with processing later.
Expand Down Expand Up @@ -48,4 +58,47 @@ def register
def tag(newtag)
@tags << newtag
end # def tag

protected
def to_event(raw, source)
@format ||= ["plain"]

event = LogStash::Event.new
event.type = @type
event.tags = @tags.clone rescue []
event.source = source

case @format.first
when "plain":
event.message = raw
when "json":
# TODO(petef): format string to generate @message
event.message = "RAW JSON: #{raw}"
begin
fields = JSON.parse(raw)
fields.each { |k, v| event[k] = v }
rescue
@logger.warn({:message => "Trouble parsing json input",
:input => raw,
:source => source,
})
return nil
end
when "json_event":
begin
event = LogStash::Event.from_json(raw)
rescue
@logger.warn({:message => "Trouble parsing json_event input",
:input => raw,
:source => source,
})
return nil
end
else
raise "unknown event format #{@format.first}, this should never happen"
end

logger.debug(["Received new event", {:source => source, :event => event}])
return event
end
end # class LogStash::Inputs::Base
2 changes: 1 addition & 1 deletion lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ def run(queue)
end
end

@@filemanager.watch(@path, @config)
@@filemanager.watch(@path, @config, method(:to_event))
end # def run
end # class LogStash::Inputs::File
34 changes: 22 additions & 12 deletions lib/logstash/inputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,21 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Base
# Maximum number of retries on a read before we give up.
config :retries, :validate => :number, :default => 5

public
def initialize(params)
super

@format ||= ["json_event"]
end # def initialize

public
def register
require 'redis'
@redis = nil
end
@redis_url = "redis://#{@password}@#{@host}:#{@port}/#{@db}"
end # def register

private
def connect
Redis.new(
:host => @host,
Expand All @@ -45,29 +55,29 @@ def connect
:db => @db,
:password => @password
)
end
end # def connect

public
def run(output_queue)
retries = @retries
loop do
begin
@redis ||= connect
response = @redis.blpop @queue, 0
retries = @retries
begin
output_queue << LogStash::Event.new(JSON.parse(response[1]))
rescue # parse or event creation error
@logger.error "failed to create event with '#{response[1]}'"
@logger.error $!
e = to_event(response[1], @redis_url)
if e
output_queue << e
end
rescue # redis error
raise RuntimeError.new "Redis connection failed too many times" if retries <= 0
if retries <= 0
raise RuntimeError, "Redis connection failed too many times"
end
@redis = nil
@logger.warn "Failed to get event from redis #{@name}. "+
"Will retry #{retries} times."
@logger.warn $!
@logger.warn(["Failed to get event from redis #{@name}. " +
"Will retry #{retries} times.", $!])
retries -= 1
sleep 1
sleep(1)
end
end # loop
end # def run
Expand Down
15 changes: 3 additions & 12 deletions lib/logstash/inputs/stdin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,10 @@ def register

def run(queue)
loop do
event = LogStash::Event.new
begin
event.message = $stdin.readline.chomp
rescue *[EOFError, IOError] => e
@logger.info("Got EOF from stdin input. Ending")
finished
return
e = to_event($stdin.readline.chomp, "stdin://#{@host}/")
if e
queue << e
end
event.type = @type
event.tags = @tags.clone rescue []
event.source = "stdin://#{@host}/"
@logger.debug(["Got event", event])
queue << event
end # loop
end # def run

Expand Down
26 changes: 15 additions & 11 deletions lib/logstash/inputs/stomp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,37 @@ class LogStash::Inputs::Stomp < LogStash::Inputs::Base
# The destination to read events from.
#
# Example: "/topic/logstash"
config :destination, :validate => :string
config :destination, :validate => :string, :required => true

# Enable debugging output?
config :debug, :validate => :boolean, :default => false

public
def initialize(params)
super

@format ||= "json_event"
end

public
def register
require "stomp"

if @destination == "" or @destination.nil?
@logger.error("No destination path given for stomp")
return
end

begin
@client = Stomp::Client.new(@user, @password.value, @host, @port)
rescue Errno::ECONNREFUSED
@stomp_url = "stomp://#{@user}:#{@password}@#{@host}:#{@port}/#{@destination}"
rescue Errno::ECONNREFUSED => e
@logger.error("Connection refused to #{@host}:#{@port}...")
# TODO(sissel): Retry?
raise e
end
end # def register

def run(queue)
@client.subscribe(@destination) do |msg|
@logger.debug(["Got message from stomp", { :msg => msg }])
#event = LogStash::Event.from_json(message.body)
#queue << event
e = to_event(message.body, @stomp_url)
if e
queue << e
end
end
end # def run
end # class LogStash::Inputs::Stomp
50 changes: 25 additions & 25 deletions lib/logstash/inputs/syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@ class LogStash::Inputs::Syslog < LogStash::Inputs::Base
# The port to listen on
config :port, :validate => :number, :default => 514

public
def initialize(params)
super

# force "plain" format. others don't make sense here.
@format = ["plain"]
end # def initialize

public
def register
# This comes from RFC3164, mostly.
Expand Down Expand Up @@ -64,20 +72,13 @@ def udp_listener(output_queue)

loop do
line, client = server.recvfrom(9000)
p :client => client
p :line => line
begin
event = LogStash::Event.new({
"@message" => line.chomp,
"@type" => @type,
"@tags" => @tags.clone,
})
source = URI::Generic.new("syslog", nil, client[3], nil, nil, nil, nil, nil, nil, nil)
syslog_relay(event, source)
rescue => e
p :exception => e
source = URI::Generic.new("syslog", nil, client[3], nil, nil, nil, nil,
nil, nil, nil)
e = to_event(line.chomp, source.to_s)
if e
syslog_relay(e, source)
output_queue << e
end
output_queue << event
end
ensure
if server
Expand All @@ -96,19 +97,18 @@ def tcp_listener(output_queue)
ip, port = client.peeraddr[3], client.peeraddr[1]
@logger.warn("got connection from #{ip}:#{port}")
LogStash::Util::set_thread_name("input|syslog|tcp|#{ip}:#{port}}")
source_base = URI::Generic.new("syslog", nil, ip, nil, nil, nil, nil, nil, nil, nil)
source_base = URI::Generic.new("syslog", nil, ip, nil, nil, nil, nil,
nil, nil, nil)
client.each do |line|
event = LogStash::Event.new({
"@message" => line.chomp,
"@type" => @type,
"@tags" => @tags.clone,
})
source = source_base.dup
syslog_relay(event, source)
output_queue << event
end
end
end
e = to_event(line.chomp, source_base.to_s)
if e
source = source_base.dup
syslog_relay(e, source)
output_queue << e
end # e
end # client.each
end # Thread.new
end # loop do
ensure
server.close if server
end # def tcp_listener
Expand Down
Loading

0 comments on commit 569af86

Please sign in to comment.