Skip to content

Commit

Permalink
update amqp output
Browse files Browse the repository at this point in the history
  • Loading branch information
fetep committed Feb 21, 2011
1 parent 0039975 commit fdfeec6
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 23 deletions.
8 changes: 8 additions & 0 deletions etc/agent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,12 @@ output {
stdout {
debug => true
}

amqp {
host => "127.0.0.1"
user => "guest"
pass => "guest"
exchange_type => "topic"
name => "testing"
}
}
50 changes: 27 additions & 23 deletions lib/logstash/outputs/amqp.rb
Original file line number Diff line number Diff line change
@@ -1,68 +1,72 @@
require "amqp" # rubygem 'amqp'
require "bunny" # rubygem 'bunny'
require "logstash/outputs/base"
require "logstash/namespace"
require "mq" # rubygem 'amqp'
require "cgi"

class LogStash::Outputs::Amqp < LogStash::Outputs::Base
MQTYPES = [ "fanout", "queue", "topic" ]

config_name "amqp"
config :host => :string
config :user => :string
config :pass => :string
config :exchange_type => :string
config :name => :string
config :vhost => :string
config :durable => :boolean
config :debug => :boolean

public
def initialize(params)
super

p @exchange_type => MQTYPES
@debug ||= false
@durable ||= false

if !MQTYPES.include?(@exchange_type)
raise "Invalid exchange_type, #{@exchange_type.inspect}, must be one of #{MQTYPES.join(", ")}"
end
end # def initialize

public
def register
@logger.info("Registering output #{@url}")
query_args = @url.query ? CGI.parse(@url.query) : {}
@logger.info("Registering output #{to_s}")
amqpsettings = {
:vhost => (@vhost or "/"),
:host => @url.host,
:port => (@url.port or 5672),
:host => @host,
:port => (@port or 5672),
}
amqpsettings[:user] = @url.user if @url.user
amqpsettings[:pass] = @url.password if @url.password
amqpsettings[:logging] = query_args.include? "debug"
@logger.debug("Connecting with AMQP settings #{amqpsettings.inspect} to set up #{@exchange_type.inspect} queue #{@name.inspect}")
@amqp = AMQP.connect(amqpsettings)
@mq = MQ.new(@amqp)
@target = nil
amqpsettings[:user] = @user if @user
amqpsettings[:pass] = @pass if @pass
amqpsettings[:logging] = @debug
@logger.debug(["Connecting to AMQP", amqpsettings, @exchange_type, @name])
@bunny = Bunny.new(amqpsettings)
@bunny.start

@target = nil
case @exchange_type
when "fanout"
@target = @mq.fanout(@name)
@target = @bunny.exchange(@name, :type => :fanout)
when "queue"
@target = @mq.queue(@name, :durable => @urlopts["durable"] ? true : false)
@target = @bunny.queue(@name, :durable => @durable)
when "topic"
@target = @mq.topic(@name)
@target = @bunny.exchange(@name, :type => :topic)
end # case @exchange_type
end # def register

public
def receive(event)
@logger.debug(["Sending event", { :url => @url, :event => event }])
@logger.debug(["Sending event", { :destination => to_s, :event => event }])
@target.publish(event.to_json)
end # def receive

# This is used by the ElasticSearch AMQP/River output.
public
def receive_raw(raw)
if @target == nil
raise "had trouble registering AMQP URL #{@url.to_s}, @target is nil"
end

@target.publish(raw)
end # def receive_raw

public
def to_s
return "amqp://#{@user}@#{@host}:#{@port}#{@vhost}/#{@exchange_type}/#{@name}"
end
end # class LogStash::Outputs::Amqp

0 comments on commit fdfeec6

Please sign in to comment.