Skip to content

Commit

Permalink
Merge pull request elastic#338 from mzaccari/master
Browse files Browse the repository at this point in the history
LOGSTASH-691: move amqp plugin to be an external plugin
  • Loading branch information
jordansissel committed Feb 12, 2013
2 parents c284b33 + 6735ee4 commit 3146d3c
Show file tree
Hide file tree
Showing 15 changed files with 160 additions and 163 deletions.
2 changes: 1 addition & 1 deletion docs/index.html.erb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ layout: content_right
<li> <a href="tutorials/getting-started-centralized"> getting started (centralized) </a> </li>
<li> <a href="tutorials/10-minute-walkthrough"> 10-minute walkthrough</a> - a simple walkthrough to show you how to configure the logstash agent to process events and even old logs. </li>
<li> <a href="tutorials/metrics-from-logs"> Gathering metrics from logs </a> - take metrics from logs and ship them to graphite, ganglia, and more. </li>
<li> <a href="tutorials/just-enough-amqp-for-logstash">Just enough AMQP for Logstash </a> - Get a quick primer on AMQP and how to use it in Logstash! </li>
<li> <a href="tutorials/just-enough-rabbitmq-for-logstash">Just enough RabbitMQ for Logstash </a> - Get a quick primer on RabbitMQ and how to use it in Logstash! </li>
</ul>

<h3> books and articles </h3>
Expand Down
2 changes: 1 addition & 1 deletion docs/logging-tool-comparisons.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ It provides you a simple event pipeline for taking events and logs from any
input, manipulating them with filters, and sending them to any output. Inputs
can be files, network, message brokers, etc. Filters are date and string
parsers, grep-like, etc. Outputs are data stores (elasticsearch, mongodb, etc),
message systems (amqp, stomp, etc), network (tcp, syslog), etc.
message systems (rabbitmq, stomp, etc), network (tcp, syslog), etc.

It also provides a web interface for doing search and analytics on your
logs.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
---
title: Just Enough AMQP - logstash
title: Just Enough RabbitMQ - logstash
layout: content_right
---

While configuring your AMQP broker is out of scope for logstash, it's important
to understand how logstash uses AMQP. To do that, we need to understand a
While configuring your RabbitMQ broker is out of scope for logstash, it's important
to understand how logstash uses RabbitMQ. To do that, we need to understand a
little about AMQP.

You should also consider reading
Expand Down Expand Up @@ -35,9 +35,8 @@ routing key. Routing keys are discussed below.

## Broker

A broker is simply the AMQP server software. There are several brokers but the
most common (and arguably popular) is [RabbitMQ](http://www.rabbitmq.com).
Some others are Apache Qpid (and the commercial version - RedHat MRG)
A broker is simply the AMQP server software. There are several brokers, but this
tutorial will cover the most common (and arguably popular), [RabbitMQ](http://www.rabbitmq.com).

# Routing Keys

Expand Down Expand Up @@ -112,19 +111,19 @@ support routing keys. Topic exchanges do support them. Just like a fanout
exchange, all bound queues see all messages with the additional filter of the
routing key.

# AMQP in logstash
# RabbitMQ in logstash

As stated earlier, in Logstash, Outputs publish to Exchanges. Inputs read from
Queues that are bound to Exchanges. Logstash uses the `bunny` AMQP library for
Queues that are bound to Exchanges. Logstash uses the `bunny` RabbitMQ library for
interaction with a broker. Logstash endeavors to expose as much of the
configuration for both exchanges and queues. There are many different tunables
that you might be concerned with setting - including things like message
durability or persistence of declared queues/exchanges. See the relevant input
and output documentation for AMQP for a full list of tunables.
and output documentation for RabbitMQ for a full list of tunables.

# Sample configurations, tips, tricks and gotchas

There are several examples in the logstash source directory of AMQP usage,
There are several examples in the logstash source directory of RabbitMQ usage,
however a few general rules might help eliminate any issues.

## Check your bindings
Expand All @@ -136,19 +135,19 @@ sender agent

input { stdin { type = "test" } }
output {
amqp {
name => "test_exchange"
host => "my_amqp_server"
rabbitmq {
exchange => "test_exchange"
host => "my_rabbitmq_server"
exchange_type => "fanout"
}
}

receiver agent

input {
amqp {
name => "test_queue"
host => "my_amqp_server"
rabbitmq {
queue => "test_queue"
host => "my_rabbitmq_server"
exchange => "test_exchange" # This matches the exchange declared above
}
}
Expand All @@ -157,15 +156,15 @@ receiver agent
## Message persistence

By default, logstash will attempt to ensure that you don't lose any messages.
This is reflected in the AMQP default settings as well. However there are
cases where you might not want this. A good example is where AMQP is not your
This is reflected in the RabbitMQ default settings as well. However there are
cases where you might not want this. A good example is where RabbitMQ is not your
primary method of shipping.

In the following example, we use AMQP as a sniffing interface. Our primary
destination is the embedded ElasticSearch instance. We have a secondary AMQP
In the following example, we use RabbitMQ as a sniffing interface. Our primary
destination is the embedded ElasticSearch instance. We have a secondary RabbitMQ
output that we use for duplicating messages. However we disable persistence and
durability on this interface so that messages don't pile up waiting for
delivery. We only use AMQP when we want to watch messages in realtime.
delivery. We only use RabbitMQ when we want to watch messages in realtime.
Additionally, we're going to leverage routing keys so that we can optionally
filter incoming messages to subsets of hosts. The exercise of getting messages
to this logstash agent are left up to the user.
Expand All @@ -176,9 +175,9 @@ to this logstash agent are left up to the user.

output {
elasticsearch { embedded => true }
amqp {
name => "logtail"
host => "my_amqp_server"
rabbitmq {
exchange => "logtail"
host => "my_rabbitmq_server"
exchange_type => "topic" # We use topic here to enable pub/sub with routing keys
key => "logs.%{host}"
durable => false # If rabbitmq restarts, the exchange disappears.
Expand Down
10 changes: 5 additions & 5 deletions etc/agent.lgtm.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ input {

output {
# This will be your durable shipping mechanism
amqp {
host => "myamqpserver"
rabbitmq {
host => "myrabbitmqserver"
exchange_type => "fanout"
name => "rawlogs"
exchange => "rawlogs"
}
# This is an optional non-durable shipping mechanism
# With this, you can sniff logs from your own code
amqp {
rabbitmq {
host => "127.0.0.1"
exchange_type => "topic"
name => "logsniff"
exchange => "logsniff"
durable => false
persistent => false
# The following is optional
Expand Down
2 changes: 1 addition & 1 deletion etc/examples/esriver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ output {
stdout { }
elasticsearch_river {
es_host => "localhost"
amqp_host => "localhost"
rabbitmq_host => "localhost"
}
}
4 changes: 2 additions & 2 deletions etc/examples/indexer.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
input {
amqp {
rabbitmq {
host => "127.0.0.1"
user => "guest"
pass => "guest"
exchange => "logstash"
name => "testing"
queue => "testing"
type => "all"
}

Expand Down
4 changes: 2 additions & 2 deletions lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ def parse_options(args)
# Load any plugins that we have flags for.
# TODO(sissel): The --<plugin> flag support currently will load
# any matching plugins input, output, or filter. This means, for example,
# that the 'amqp' input *and* output plugin will be loaded if you pass
# --amqp-foo flag. This might cause confusion, but it seems reasonable for
# that the 'rabbitmq' input *and* output plugin will be loaded if you pass
# --rabbitmq-foo flag. This might cause confusion, but it seems reasonable for
# now that any same-named component will have the same flags.
plugins = []
args.each do |arg|
Expand Down
4 changes: 2 additions & 2 deletions lib/logstash/config/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def parse
tryload o[:type], :base
type = registry[o[:type]]

# Load the plugin itself (inputs/file, outputs/amqp, etc)
# Load the plugin itself (inputs/file, outputs/rabbitmq, etc)
# TODO(sissel): Error handling
tryload o[:type], o[:plugin]
plugin = registry[o[:plugin]]
Expand Down Expand Up @@ -83,7 +83,7 @@ def each(&block)
@config.each do |type, plugin_config_array|
# plugin_config_array has arrays of each component config:
# input {
# amqp { ... }
# rabbitmq { ... }
# file { ... }
# file { ... }
# }
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/config/test.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
input {
amqp {
rabbitmq {
port => 12345
tag => [ a, b, c ]
}
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/inputs/gemfire.rb
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,4 @@ def afterRegionDestroy(event)
def afterRegionInvalidate(event)
@logger.debug("afterRegionInvalidate #{event}")
end
end # class LogStash::Inputs::Amqp
end # class LogStash::Inputs::Gemfire
89 changes: 44 additions & 45 deletions lib/logstash/inputs/amqp.rb → lib/logstash/inputs/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,42 @@
require "logstash/namespace"
require "cgi" # for CGI.escape

# Pull events from an AMQP exchange.
#
# <b> NOTE: THIS IS ONLY KNOWN TO WORK WITH RECENT RELEASES OF RABBITMQ. Any
# other amqp broker will not work with this plugin. I do not know why. If you
# need support for brokers other than rabbitmq, please file bugs here:
# <https://github.com/ruby-amqp/bunny> </b>
# Pull events from a RabbitMQ exchange.
#
# The default settings will create an entirely transient queue and listen for all messages by default.
# If you need durability or any other advanced settings, please set the appropriate options
class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable
#
# This has been tested with Bunny 0.9.x, which supports RabbitMQ 2.x and 3.x. You can
# find links to both here:
#
# * RabbitMQ - <http://www.rabbitmq.com/>
# * Bunny - <https://github.com/ruby-amqp/bunny>
class LogStash::Inputs::RabbitMQ < LogStash::Inputs::Threadable

config_name "amqp"
plugin_status "unsupported"
config_name "rabbitmq"
plugin_status "beta"

# Your amqp broker's custom arguments. For mirrored queues in RabbitMQ: [ "x-ha-policy", "all" ]
# Custom arguments. For example, mirrored queues in RabbitMQ 2.x: [ "x-ha-policy", "all" ]
# RabbitMQ 3.x mirrored queues are set by policy. More information can be found
# here: http://www.rabbitmq.com/blog/2012/11/19/breaking-things-with-rabbitmq-3-0/
config :arguments, :validate => :array, :default => []

# Your amqp server address
# Your rabbitmq server address
config :host, :validate => :string, :required => true

# The AMQP port to connect on
# The rabbitmq port to connect on
config :port, :validate => :number, :default => 5672

# Your amqp username
# Your rabbitmq username
config :user, :validate => :string, :default => "guest"

# Your amqp password
# Your rabbitmq password
config :password, :validate => :password, :default => "guest"

# The name of the queue. Depricated due to conflicts with puppet naming convention.
# Replaced by 'queue' variable. See LOGSTASH-755
config :name, :validate => :string, :deprecated => true

# The name of the queue.
config :queue, :validate => :string, :default => ""

# The name of the exchange to bind the queue. This is analogous to the 'amqp
# output' [config 'name'](../outputs/amqp)
# The name of the exchange to bind the queue.
config :exchange, :validate => :string, :required => true

# The routing key to use. This is only valid for direct or fanout exchanges
Expand Down Expand Up @@ -80,6 +78,9 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Threadable

# Validate SSL certificate
config :verify_ssl, :validate => :boolean, :default => false

# Maximum permissible size of a frame (in bytes) to negotiate with clients
config :frame_max, :validate => :number, :default => 131072

public
def initialize(params)
Expand All @@ -90,43 +91,41 @@ def initialize(params)
end # def initialize

public
def register

if @name
if @queue
@logger.error("'name' and 'queue' are the same setting, but 'name' is deprecated. Please use only 'queue'")
end
@queue = @name
end
def register

@logger.info("Registering input #{@url}")
require "bunny" # rubygem 'bunny'
require "bunny"

@vhost ||= "/"
@port ||= 5672
@key ||= "#"
@amqpsettings = {

@rabbitmq_settings = {
:vhost => @vhost,
:host => @host,
:port => @port,
}
@amqpsettings[:user] = @user if @user
@amqpsettings[:pass] = @password.value if @password
@amqpsettings[:logging] = @debug
@amqpsettings[:ssl] = @ssl if @ssl
@amqpsettings[:verify_ssl] = @verify_ssl if @verify_ssl
@amqpurl = "amqp://"

@rabbitmq_settings[:user] = @user if @user
@rabbitmq_settings[:pass] = @password.value if @password
@rabbitmq_settings[:logging] = @debug
@rabbitmq_settings[:ssl] = @ssl if @ssl
@rabbitmq_settings[:verify_ssl] = @verify_ssl if @verify_ssl
@rabbitmq_settings[:frame_max] = @frame_max if @frame_max

@rabbitmq_url = "amqp://"
if @user
@amqpurl << @user if @user
@amqpurl << ":#{CGI.escape(@password.to_s)}" if @password
@amqpurl << "@"
@rabbitmq_url << @user if @user
@rabbitmq_url << ":#{CGI.escape(@password.to_s)}" if @password
@rabbitmq_url << "@"
end
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@queue}"
@rabbitmq_url += "#{@host}:#{@port}#{@vhost}/#{@queue}"
end # def register

def run(queue)
begin
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up queue #{@queue.inspect}")
@bunny = Bunny.new(@amqpsettings)
@logger.debug("Connecting with RabbitMQ settings #{@rabbitmq_settings.inspect} to set up queue #{@queue.inspect}")
@bunny = Bunny.new(@rabbitmq_settings)
return if terminating?
@bunny.start
@bunny.qos({:prefetch_count => @prefetch_count})
Expand All @@ -137,14 +136,14 @@ def run(queue)
@bunnyqueue.bind(@exchange, :key => @key)

@bunnyqueue.subscribe({:ack => @ack}) do |data|
e = to_event(data[:payload], @amqpurl)
e = to_event(data[:payload], @rabbitmq_url)
if e
queue << e
end
end # @bunnyqueue.subscribe

rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
@logger.error("AMQP connection error, will reconnect: #{e}")
@logger.error("RabbitMQ connection error, will reconnect: #{e}")
# Sleep for a bit before retrying.
# TODO(sissel): Write 'backoff' method?
sleep(1)
Expand All @@ -158,4 +157,4 @@ def teardown
@bunny.close if @bunny
finished
end # def teardown
end # class LogStash::Inputs::Amqp
end # class LogStash::Inputs::RabbitMQ
Loading

0 comments on commit 3146d3c

Please sign in to comment.