Skip to content

Commit

Permalink
- Add the new refactored agent model.
Browse files Browse the repository at this point in the history
  An agent can read from any input, apply filters, and pass to any output.
    * Inputs are files, amqp, stomp, http server, syslog server, etc.
    * Outputs are similar.
    * Filters are for manipulating events (parsing, adding data, trimming
      private data, etc)
  Inputs so far: amqp, file.
  Outputs so far: amqp, stdout.
  Filters so far: grok (pattern discovery only)

  A sample custom agent is in examples/test.rb
  • Loading branch information
jordansissel committed Oct 17, 2010
1 parent 1575edc commit 791f15f
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 35 deletions.
35 changes: 26 additions & 9 deletions bin/logstash
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,31 @@ require "rubygems"
require "eventmachine"
require "lib/components/agent"

config = {
"logs" => [
"/var/log/messages",
],
}
case ARGV[0]
when "client"
config = {
"input" => [
"/var/log/messages",
"/var/log/apache2/access.log",
],
"output" => [
"amqp://localhost/topic/testing",
],
}
when "server"
config = {
"input" => [
"amqp://localhost/topic/testing",
],
"filter" => [
"grok",
],
"output" => [
"stdout:///",
"amqp://localhost/topic/parsed",
],
}
end

agent = LogStash::Components::Agent.new(config)

EventMachine.run do
agent.register
end
agent.run
28 changes: 28 additions & 0 deletions examples/test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env ruby

require "rubygems"
require "eventmachine"
require "lib/components/agent"
require "ap"

class MyAgent < LogStash::Components::Agent
def initialize
super({
"input" => [
"amqp://localhost/topic/parsed",
]
})
end # def initialize

def receive(event)
return unless event["progname"][0] == "pantscon"
return unless event["message"] =~ /naughty host/
event["IP"].each do |ip|
next unless ip.length > 0
puts "Evil IP: #{ip}"
end
end # def receive
end # class MyAgent

agent = MyAgent.new
agent.run
78 changes: 52 additions & 26 deletions lib/components/agent.rb
Original file line number Diff line number Diff line change
@@ -1,29 +1,19 @@

require "eventmachine"
require "eventmachine-tail"

class Reader < EventMachine::FileTail
def initialize(path, agent)
super(path)
@agent = agent
@buffer = BufferedTokenizer.new # From eventmachine
end

def receive_data(data)
# TODO(sissel): Support multiline log data
@buffer.extract(data).each do |line|
# Package it up into an event object before passing it along.
@agent.process(path, line)
end
end # def receive_data
end # class Reader
require "logstash/namespace"
require "logstash/inputs"
require "logstash/outputs"
require "logstash/filters"

# Collect logs, ship them out.
module LogStash; module Components; class Agent
class LogStash::Components::Agent
attr_reader :config

def initialize(config)
@config = config
@outputs = []
@inputs = []
@filters = []
# Config should have:
# - list of logs to monitor
# - log config
Expand All @@ -33,15 +23,51 @@ def initialize(config)
# Register any event handlers with EventMachine
# Technically, this agent could listen for anything (files, sockets, amqp,
# stomp, etc).
protected
def register
@config["logs"].each do |path|
EventMachine::FileGlobWatchTail.new(path, Reader, interval=60,
exclude=[], agent=self)
end # each log
# Register input and output stuff
if @config.include?("input")
@config["input"].each do |url|
input = LogStash::Inputs.from_url(url) { |event| receive(event) }
input.register
@inputs << input
end # each input
end

if @config.include?("filter")
@config["filter"].each do |name|
filter = LogStash::Filters.from_name(name)
filter.register
@filters << filter
end # each filter
end

if @config.include?("output")
@config["output"].each do |url|
output = LogStash::Outputs.from_url(url)
output.register
@outputs << output
end # each output
end
end # def register

public
def run
EventMachine.run do
self.register
end # EventMachine.run
end # def run

protected
# Process a message
def process(source, message)
puts "#{source}: #{message}"
end # def process
end; end; end; # class LogStash::Components::Agent
def receive(event)
@filters.each do |filter|
# TODO(sissel): Add ability for a filter to cancel/drop a message
filter.filter(event)
end

@outputs.each do |output|
output.receive(event)
end # each output
end # def input
end # class LogStash::Components::Agent
50 changes: 50 additions & 0 deletions lib/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
require "json"
require "logstash/time"

# General event type. Will expand this in the future.
module LogStash; class Event
def initialize(data)
@data = data
if !@data.include?(:received_timestamp)
@data[:received_timestamp] = LogStash::Time.now.utc.to_iso8601
end
end # def initialize

def self.from_json(json)
return Event.new(JSON.parse(json))
end # def self.from_json

def to_json
return @data.to_json
end

def to_s
#require "ap" rescue nil
#if @data.respond_to?(:awesome_inspect)
#return "#{timestamp} #{source}: #{@data.awesome_inspect}"
#else
#return "#{timestamp} #{source}: #{@data.inspect}"
#end
return "#{timestamp} #{source}: #{message}"
end # def to_s

def [](key)
return @data[key]
end

def []=(key, value)
@data[key] = value
end

def timestamp
@data[:received_timestamp] or @data["received_timestamp"]
end

def source
@data[:source] or @data["source"]
end

def message
@data[:message] or @data["message"]
end
end; end # class LogStash::Event
17 changes: 17 additions & 0 deletions lib/filters.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

require "logstash/namespace"

module LogStash::Filters
def self.from_name(name)
# TODO(sissel): Add error handling
# TODO(sissel): Allow plugin paths
klass = name.capitalize

# Load the class if we haven't already.
require "logstash/filters/#{name}"

# Get the class name from the Filters namespace and create a new instance.
# for name == 'foo' this will call LogStash::Filters::Foo.new
LogStash::Filters.const_get(klass).new
end # def from_url
end # module LogStash::Filters
37 changes: 37 additions & 0 deletions lib/filters/grok.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
require "logstash/namespace"
require "grok" # rubygem 'grok'

class LogStash::Filters::Grok
def initialize(config = {})
@config = config
@grok = Grok.new
end # def initialize

def register
@grok.add_patterns_from_file("patterns/grok-patterns")
end

def filter(event)
# parse it with grok
message = event.message
pattern = @grok.discover(message)
@grok.compile(pattern)
match = @grok.match(message)
match.each_capture do |key, value|
if key.include?(":")
key = key.split(":")[1]
end

if event[key].is_a? String
event[key] = [event[key]]
elsif event[key] == nil
event[key] = []
end

event[key] << value
end

# TODO(sissel): Flatten single-entry arrays into a single value?
return event
end
end # class LogStash::Filters::Grok
18 changes: 18 additions & 0 deletions lib/inputs.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

require "logstash/namespace"
require "uri"

module LogStash::Inputs
def self.from_url(url, &block)
# Assume file paths if we start with "/"
url = "file://#{url}" if url.start_with?("/")

uri = URI.parse(url)
# TODO(sissel): Add error handling
# TODO(sissel): Allow plugin paths
klass = uri.scheme.capitalize
file = uri.scheme
require "logstash/inputs/#{file}"
LogStash::Inputs.const_get(klass).new(uri, &block)
end # def from_url
end # module LogStash::Inputs
54 changes: 54 additions & 0 deletions lib/inputs/amqp.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
require "logstash/namespace"
require "logstash/event"
require "uri"
require "amqp" # rubygem 'amqp'
require "mq" # rubygem 'amqp'
require "uuidtools" # rubygem 'uuidtools'

class LogStash::Inputs::Amqp
TYPES = [ "fanout", "queue", "topic" ]

def initialize(url, config={}, &block)
@url = url
@url = URI.parse(url) if url.is_a? String
@config = config
@callback = block
@mq = nil

# Handle path /<type>/<name>
unused, @type, @name = @url.path.split("/", 3)
if @type == nil or @name == nil
raise "amqp urls must have a path of /<type>/name where <type> is #{TYPES.join(", ")}"
end

if !TYPES.include?(@type)
raise "Invalid type '#{@type}' must be one 'fanout' or 'queue'"
end
end

def register
@amqp = AMQP.connect(:host => @url.host)
@mq = MQ.new(@amqp)
@target = nil

@target = @mq.queue(UUIDTools::UUID.timestamp_create)
case @type
when "fanout"
@target.bind(MQ.fanout(@url.path, :durable => true))
when "direct"
@target.bind(MQ.direct(@url.path, :durable => true))
when "topic"
@target.bind(MQ.topic(@url.path, :durable => true))
end # case @type

@target.subscribe(:ack => true) do |header, message|
event = LogStash::Event.from_json(message)
receive(event)
header.ack
end
end # def register

def receive(event)
@callback.call(event)
end # def event
end # class LogStash::Inputs::Amqp
42 changes: 42 additions & 0 deletions lib/inputs/file.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
require "logstash/namespace"
require "logstash/event"
require "eventmachine-tail"
require "uri"

class LogStash::Inputs::File
def initialize(url, config={}, &block)
@url = url
@url = URI.parse(url) if url.is_a? String
@config = config
@callback = block
end

def register
EventMachine::FileGlobWatchTail.new(@url.path, Reader, interval=60,
exclude=[], receiver=self)
end

def receive(event)
event = LogStash::Event.new({
:source => @url.to_s,
:message => event,
})
@callback.call(event)
end # def event

private
class Reader < EventMachine::FileTail
def initialize(path, receiver)
super(path)
@receiver = receiver
@buffer = BufferedTokenizer.new # From eventmachine
end

def receive_data(data)
# TODO(sissel): Support multiline log data
@buffer.extract(data).each do |line|
@receiver.receive(line)
end
end # def receive_data
end # class Reader
end # class LogStash::Inputs::File
7 changes: 7 additions & 0 deletions lib/namespace.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@

module LogStash
module Components; end
module Inputs; end
module Outputs; end
module Filters; end
end # module LogStash
Loading

0 comments on commit 791f15f

Please sign in to comment.