Skip to content

Commit

Permalink
updating riak output with better support
Browse files Browse the repository at this point in the history
  • Loading branch information
lusis committed Jul 1, 2012
1 parent bdb2c90 commit 409611a
Show file tree
Hide file tree
Showing 10 changed files with 63 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ gem "gmetric", "0.1.3" # outputs/ganglia, # License: MIT
gem "xmpp4r", "0.5" # outputs/xmpp, # License: As-Is
gem "gelfd", "0.2.0" #inputs/gelf, # License: Apache 2.0
gem "jruby-win32ole", :platforms => :jruby # inputs/eventlog, # License: JRuby
gem "jruby-httpclient", :platforms => :jruby #outputs/http, # License: Apache 2.0
gem "excon", :platforms => :ruby #outputs/http, # License: MIT License
gem "pry"

gem "ffi-rzmq", "0.9.0"
Expand Down
Empty file modified lib/logstash/filters/base.rb
100755 → 100644
Empty file.
Empty file modified lib/logstash/inputs/base.rb
100755 → 100644
Empty file.
Empty file modified lib/logstash/inputs/threadable.rb
100755 → 100644
Empty file.
Empty file modified lib/logstash/inputs/zeromq.rb
100755 → 100644
Empty file.
Empty file modified lib/logstash/outputs/elasticsearch_http.rb
100755 → 100644
Empty file.
Empty file modified lib/logstash/outputs/email.rb
100755 → 100644
Empty file.
70 changes: 61 additions & 9 deletions lib/logstash/outputs/riak.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ class LogStash::Outputs::Riak < LogStash::Outputs::Base
# Choose this carefully. Best to let riak decide....
config :key_name, :validate => :string


# Quorum options (NYI)
# Logstash hash of options for various quorum parameters
# Bucket properties (NYI)
# Logstash hash of properties for the bucket
# i.e.
# `quorum => ["r", "1", "w", "1", "dw", "1"]`
config :quorum, :validate => :array, :default => {"r" => 1, "w" => 1, "dw" => 1}
# `bucket_props => ["r", "one", "w", "one", "dw", "one"]`
# or
# `bucket_props => ["n_val", "3"]`
# Note that the Logstash config language cannot support
# hash or array values
# Properties will be passed as-is
config :bucket_props, :validate => :hash

# Indices
# Array of fields to add 2i on
Expand All @@ -46,13 +50,34 @@ class LogStash::Outputs::Riak < LogStash::Outputs::Base
# Off by default as not everyone runs eleveldb
config :indices, :validate => :array

# Search (NYI)
# Search
# Enable search on the bucket defined above
config :enable_search, :validate => :boolean, :default => false

# SSL
# Enable SSL
config :enable_ssl, :validate => :boolean, :default => false

# SSL Options
# Options for SSL connections
# Only applied if SSL is enabled
# Logstash hash that maps to the riak-client options
# here: https://github.com/basho/riak-ruby-client/wiki/Connecting-to-Riak
# You'll likely want something like this:
# `ssl_opts => ["pem", "/etc/riak.pem", "ca_path", "/usr/share/certificates"]
# Per the riak client docs, the above sample options
# will turn on SSL `VERIFY_PEER`
config :ssl_opts, :validate => :hash

# Metadata (NYI)
# Allow the user to set custom metadata on the object
# Should consider converting logstash data to metadata as well
#

public
def register
require 'riak'
riak_opts = {}
cluster_nodes = Array.new
@logger.debug("Setting protocol", :protocol => @proto)
proto_type = "#{@proto}_port".to_sym
Expand All @@ -61,7 +86,19 @@ def register
cluster_nodes << {:host => node, proto_type => port}
end
@logger.debug("Cluster nodes", :nodes => cluster_nodes)
@client = Riak::Client.new(:nodes => cluster_nodes)
if @enable_ssl
@logger.debug("SSL requested")
if @ssl_opts
@logger.debug("SSL options provided", @ssl_opts)
riak_opts.merge!(@ssl_opts.inject({}) {|h,(k,v)| h[k.to_sym] = v; h})
else
riak_opts.merge!({:ssl => true})
end
@logger.debug("Riak options:", :riak_opts => riak_opts)
end
riak_opts.merge!({:nodes => cluster_nodes})
@logger.debug("Riak options:", :riak_opts => riak_opts)
@client = Riak::Client.new(riak_opts)
end # def register

public
Expand All @@ -70,8 +107,23 @@ def receive(event)

# setup our bucket
bukkit = @client.bucket(event.sprintf(@bucket))
@logger.debug("Bucket", :bukkit => bukkit.to_s)

# Disable bucket props for now
# Need to detect params passed that should be converted to int
# otherwise setting props fails =(
# Logstash syntax only supports strings and bools
# likely fix is to either hack in is_numeric?
# or whitelist certain params and call to_i
##@logger.debug("Setting bucket props", :props => @bucket_props)
##bukkit.props = @bucket_props if @bucket_props
##@logger.debug("Bucket", :bukkit => bukkit.inspect)

if @enable_search
@logger.debug("Enable search requested", :bucket => bukkit.inspect)
# Check if search is enabled
@logger.debug("Checking bucket status", :search_enabled => bukkit.is_indexed?)
bukkit.enable_index! unless bukkit.is_indexed?
@logger.debug("Rechecking bucket status", :search_enabled => bukkit.is_indexed?)
end
@key_name.nil? ? evt_key=nil : evt_key=event.sprintf(@key_name)
evt = Riak::RObject.new(bukkit, evt_key)
@logger.debug("RObject", :robject => evt.to_s)
Expand Down
Empty file modified lib/logstash/outputs/sns.rb
100755 → 100644
Empty file.
Empty file modified lib/logstash/outputs/zeromq.rb
100755 → 100644
Empty file.

0 comments on commit 409611a

Please sign in to comment.