Skip to content

Commit

Permalink
reworking the plugin a bit - ensure the code is written for coders :).
Browse files Browse the repository at this point in the history
Thanks for the input Jordan!
  • Loading branch information
NickPadilla committed Mar 29, 2012
1 parent 1ae76f9 commit 061caf2
Showing 1 changed file with 62 additions and 40 deletions.
102 changes: 62 additions & 40 deletions lib/logstash/outputs/graphtastic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ class LogStash::Outputs::GraphTastic < LogStash::Outputs::Base
config_name "graphtastic"
plugin_status "beta"

# options are UDP(fastest - default) - RMI(faster) - REST(fast) - TCP (don't use TCP yet - some problems - errors out)
config :integration, :validate => :string, :default => "udp"
# options are udp(fastest - default) - rmi(faster) - rest(fast) - tcp(don't use TCP yet - some problems - errors out on linux)
config :integration, :validate => ["udp","tcp","rmi","rest"], :default => "udp"

# if using rest as your end point you need to also provide the application url
# it defaults to localhost/graphtastic - if you change it make sure you
# include the context root - graphtastic - or if you changed it make sure it
# matches what is in the web.xml of the graphtastic application
config :applicationContext, :validate => :string, :default => "graphtastic"
# it defaults to localhost/graphtastic. You can customize the application url
# by changing the name of the .war file. There are other ways to change the
# application context, but they vary depending on the Application Server in use.
# Please consult your application server documentation for more on application
# contexts.
config :context, :validate => :string, :default => "graphtastic"

# metrics hash - you will provide a name for your metric and the metric
# data as key value pairs. so for example:
Expand All @@ -42,15 +44,16 @@ class LogStash::Outputs::GraphTastic < LogStash::Outputs::Base
config :host, :validate => :string, :default => "127.0.0.1"

# port for the graphtastic instance - defaults to 1199 for RMI, 1299 for TCP, 1399 for UDP, and 8080 for REST
config :port, :validate => :number, :default => 0
config :port, :validate => :number

# number of attempted retry after send error - currently only way to integrate
# errored transactions - should try and save to a file or later consumption
# either by graphtastic utility or by this program after connectivity is
# ensured to be established.
config :retries, :validate => :number, :default => 1

# the number of metrics to send to GraphTastic at one time.
# the number of metrics to send to GraphTastic at one time. 60 seems to be the perfect
# amount for UDP, with default packet size.
config :batch_number, :validate => :number, :default => 60

# setting allows you to specify where we save errored transactions
Expand All @@ -68,14 +71,14 @@ def register
raise Exception.new("LogStash::Outputs::GraphTastic# JRuby is needed for RMI to work!")
end
require "java"
if @port == 0
if @port.nil?
@port = 1199
end
registry = java.rmi.registry.LocateRegistry.getRegistry(@host, @port);
@remote = registry.lookup("RmiMetricService")
elsif @integration.downcase == "rest"
require "net/http"
if @port == 0
if @port.nil?
@port = 8080
gem "mail" #outputs/email, # License: MIT License
end
Expand All @@ -99,50 +102,29 @@ def receive(event)
@retry = 1
@logger.debug("Event found for GraphTastic!", :tags => @tags, :event => event)
@metrics.each do |name, metric|
send(event.sprintf(name),event.sprintf(metric),(event.unix_timestamp*1000))# unix_timestamp is what I need in seconds - multiply by 1000 to make milliseconds.
postMetric(event.sprintf(name),event.sprintf(metric),(event.unix_timestamp*1000))# unix_timestamp is what I need in seconds - multiply by 1000 to make milliseconds.
end
end

def send(name, metric, timestamp)
def postMetric(name, metric, timestamp)
message = name+","+metric+","+timestamp.to_s
if @batch.length < @batch_number
@batch.push(message)
else
sendMessage()

flushMetrics()
end
end

def sendMessage()
def flushMetrics()
begin
if @integration.downcase == "tcp"
# to correctly read the line we need to ensure we send \r\n at the end of every message.
if @port == 0
@port = 1299
end
tcpsocket = TCPSocket.open(@host, @port)
tcpsocket.send(@batch.join(',')+"\r\n", 0)
tcpsocket.close
@logger.debug("GraphTastic Sent Message Using TCP : #{@batch.join(',')}")
flushViaTCP()
elsif @integration.downcase == "rmi"
if RUBY_ENGINE != "jruby"
raise Exception.new("LogStash::Outputs::GraphTastic# JRuby is needed for RMI to work!")
end
@remote.insertMetrics(@batch.join(','))
@logger.debug("GraphTastic Sent Message Using RMI : #{@batch.join(',')}")
flushViaRMI()
elsif @integration.downcase == "udp"
if @port == 0
@port = 1399
end
udpsocket.send(@batch.join(','), 0, @host, @port)
@logger.debug("GraphTastic Sent Message Using UDP : #{@batch.join(',')}")
flushViaUDP()
elsif @integration.downcase == "rest"
request = Net::HTTP::Put.new("/#{@applicationContext}/addMetric/#{@batch.join(',')}")
response = @http.request(request)
@logger.debug("GraphTastic Sent Message Using REST : #{@batch.join(',')}", :response => response.inspect)
if response == 'ERROR'
raise 'Error happend when sending metric to GraphTastic using REST!'
end
flushViaREST()
else
@logger.error("GraphTastic Not Able To Find Correct Integration - Nothing Sent - Integration Type : ", :@integration => @integration)
end
Expand All @@ -152,10 +134,50 @@ def sendMessage()
@logger.info("*******Attempting #{@retry} out of #{@retries}")
while @retry < @retries
@retry = @retry + 1
sendMessage()
flushMetrics()
end
end
end

# send metrics via udp
def flushViaUDP()
if @port.nil?
@port = 1399
end
udpsocket.send(@batch.join(','), 0, @host, @port)
@logger.debug("GraphTastic Sent Message Using UDP : #{@batch.join(',')}")
end

# send metrics via REST
def flushViaREST()
request = Net::HTTP::Put.new("/#{@context}/addMetric/#{@batch.join(',')}")
response = @http.request(request)
if response == 'ERROR'
raise 'Error happend when sending metric to GraphTastic using REST!'
end
@logger.debug("GraphTastic Sent Message Using REST : #{@batch.join(',')}", :response => response.inspect)
end

# send metrics via RMI
def flushViaRMI()
if RUBY_ENGINE != "jruby"
raise Exception.new("LogStash::Outputs::GraphTastic# JRuby is needed for RMI to work!")
end
@remote.insertMetrics(@batch.join(','))
@logger.debug("GraphTastic Sent Message Using RMI : #{@batch.join(',')}")
end

# send metrics via tcp
def flushViaTCP()
# to correctly read the line we need to ensure we send \r\n at the end of every message.
if @port.nil?
@port = 1299
end
tcpsocket = TCPSocket.open(@host, @port)
tcpsocket.send(@batch.join(',')+"\r\n", 0)
tcpsocket.close
@logger.debug("GraphTastic Sent Message Using TCP : #{@batch.join(',')}")
end

def udpsocket; @socket ||= UDPSocket.new end

Expand Down

0 comments on commit 061caf2

Please sign in to comment.