Skip to content

Commit

Permalink
Merge branch 'petef/no_metrics'
Browse files Browse the repository at this point in the history
  • Loading branch information
fetep committed Jul 7, 2012
2 parents 86e4a81 + 564a18e commit 996c081
Show file tree
Hide file tree
Showing 11 changed files with 16 additions and 86 deletions.
11 changes: 0 additions & 11 deletions lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,6 @@ def run_with_config(config)
filter.logger = @logger
@plugin_setup_mutex.synchronize do
filter.register
filter.prepare_metrics
end
end

Expand Down Expand Up @@ -465,16 +464,6 @@ def run_with_config(config)
# like tests, etc.
yield if block_given?

Thread.new do
while true
@logger.info("metrics dump")
@logger.metrics.each do |identifier, metric|
@logger.info("metric #{identifier}", metric.to_hash)
end
sleep 5
end
end

while sleep(2)
if @plugins.values.count { |p| p.alive? } == 0
@logger.warn("no plugins running, shutting down")
Expand Down
9 changes: 1 addition & 8 deletions lib/logstash/filters/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,14 @@ def register
raise "#{self.class}#register must be overidden"
end # def register

public
def prepare_metrics
@filter_metric = @logger.metrics.timer(self)
end # def prepare_metrics

public
def filter(event)
raise "#{self.class}#filter must be overidden"
end # def filter

public
def execute(event, &block)
@filter_metric.time do
filter(event, &block)
end
filter(event, &block)
end # def execute

public
Expand Down
9 changes: 1 addition & 8 deletions lib/logstash/inputs/amqp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,6 @@ def register
amqp_credentials << ":#{@password}" if @password
@amqpurl += amqp_credentials unless amqp_credentials.nil?
@amqpurl += "#{@host}:#{@port}#{@vhost}/#{@name}"

@metric_amqp_read = @logger.metrics.timer(self, "amqp-read")
@metric_queue_write = @logger.metrics.timer(self, "internal-queue-write")
end # def register

def run(queue)
Expand All @@ -124,16 +121,12 @@ def run(queue)
@queue = @bunny.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash })
@queue.bind(@exchange, :key => @key)

timer = @metric_amqp_read.time
@queue.subscribe({:ack => @ack}) do |data|
timer.stop
e = to_event(data[:payload], @amqpurl)
if e
@metric_queue_write.time do
queue << e
end
queue << e
end
timer = @metric_amqp_read.time
end # @queue.subscribe

rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
Expand Down
15 changes: 4 additions & 11 deletions lib/logstash/inputs/generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ class LogStash::Inputs::Generator < LogStash::Inputs::Threadable
public
def register
@host = Socket.gethostname
@metric_generate = @logger.metrics.timer(self, "event-generation")
@metric_queue_write = @logger.metrics.timer(self, "queue-write-time")

if @count.is_a?(Array)
@count = @count.first
Expand All @@ -46,15 +44,10 @@ def run(queue)
end

while !finished? && (@count <= 0 || number < @count)
@metric_generate.time do
event = to_event(@message, source)
event["sequence"] = number
# Time how long each queue push takes.
number += 1
@metric_queue_write.time do
queue << event
end
end
event = to_event(@message, source)
event["sequence"] = number
number += 1
queue << event
end # loop
end # def run

Expand Down
4 changes: 1 addition & 3 deletions lib/logstash/inputs/log4j.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ def register
@logger.info("Starting Log4j input listener", :address => "#{@host}:#{@port}")
@server_socket = TCPServer.new(@host, @port)
end
@event_meter = @logger.metrics.meter(self, "events")
@logger.info("Log4j input", :meter => @event_meter)
@logger.info("Log4j input")
end # def register

private
Expand Down Expand Up @@ -98,7 +97,6 @@ def server?

private
def readline(socket)
@event_meter.mark
line = socket.readline
end # def readline

Expand Down
3 changes: 0 additions & 3 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ def register
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
@server_socket = TCPServer.new(@host, @port)
end
@event_meter = @logger.metrics.meter(self, "events")
@logger.info("tcp input", :meter => @event_meter)
end # def register

private
Expand Down Expand Up @@ -89,7 +87,6 @@ def server?

private
def readline(socket)
@event_meter.mark
line = socket.readline
end # def readline

Expand Down
11 changes: 1 addition & 10 deletions lib/logstash/multiqueue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ def initialize(*queues)
public
def logger=(_logger)
@logger = _logger
@metric_queue_write = @logger.metrics.timer(self, "multiqueue-write")
@metric_queue_count = @logger.metrics.counter(self, "multiqueue-queues")

# TODO(sissel): gauge not implemented yet.
#@metric_queue_items = @logger.metrics.gauge(self, "multiqueue-items") { size }

# Set the logger for all known queues, too.
@queues.each do |q|
Expand All @@ -30,9 +25,7 @@ def logger=(_logger)
# Push an object to all queues.
public
def push(object)
@metric_queue_write.time do
@queues.each { |q| q.push(object) }
end
@queues.each { |q| q.push(object) }
end # def push
alias :<< :push

Expand All @@ -42,15 +35,13 @@ def push(object)
public
def add_queue(queue)
@mutex.synchronize do
@metric_queue_count.incr
@queues << queue
end
end # def add_queue

public
def remove_queue(queue)
@mutex.synchronize do
@metric_queue_count.decr
@queues.delete(queue)
end
end # def remove_queue
Expand Down
16 changes: 3 additions & 13 deletions lib/logstash/outputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ def register
@last_stale_cleanup_cycle = now
flush_interval = @flush_interval.to_i
@stale_cleanup_interval = 10
@metric_flushes = @logger.metrics.timer(self, "flushes")
@metric_write_delay = @logger.metrics.timer(self, "write-delay")
@metric_write_bytes = @logger.metrics.histogram(self, "write-bytes")
end # def register

public
Expand All @@ -66,10 +63,7 @@ def receive(event)
output = event.to_json
end

@metric_write_delay.time do
fd.puts(output)
end
@metric_write_bytes.record(output.size)
fd.puts(output)

flush(fd)
close_stale_files
Expand All @@ -93,9 +87,7 @@ def flush(fd)
if flush_interval > 0
flush_pending_files
else
@metric_flushes.time do
fd.flush
end
fd.flush
end
end

Expand All @@ -105,9 +97,7 @@ def flush_pending_files
@logger.debug("Starting flush cycle")
@files.each do |path, fd|
@logger.debug("Flushing file", :path => path, :fd => fd)
@metric_flushes.time do
fd.flush
end
fd.flush
end
@last_flush_cycle = Time.now
end
Expand Down
2 changes: 0 additions & 2 deletions lib/logstash/outputs/null.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ class LogStash::Outputs::Null < LogStash::Outputs::Base

public
def register
@metric_hits = @logger.metrics.meter(self, "events")
end # def register

public
def receive(event)
@metric_hits.mark
end # def event
end # class LogStash::Outputs::Null
15 changes: 5 additions & 10 deletions lib/logstash/outputs/pipe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class LogStash::Outputs::Pipe < LogStash::Outputs::Base
def register
@pipes = {}
@last_stale_cleanup_cycle = Time.now
@metric_write_delay = @logger.metrics.timer(self, "write-delay")
@metric_write_bytes = @logger.metrics.histogram(self, "write-bytes")
end # def register

public
Expand All @@ -45,15 +43,12 @@ def receive(event)
output = event.to_json
end

@metric_write_delay.time do
begin
pipe.puts(output)
rescue IOError, Errno::EPIPE => e
@logger.error("Error writing to pipe, closing pipe.", :command => command, :pipe => pipe)
drop_pipe(command)
end
begin
pipe.puts(output)
rescue IOError, Errno::EPIPE => e
@logger.error("Error writing to pipe, closing pipe.", :command => command, :pipe => pipe)
drop_pipe(command)
end
@metric_write_bytes.record(output.size)

close_stale_pipes
end # def receive
Expand Down
7 changes: 0 additions & 7 deletions lib/logstash/sized_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,7 @@ class LogStash::SizedQueue < SizedQueue
public
def logger=(_logger)
@logger = _logger
@metric_queue_write = @logger.metrics.timer(self, "queue-write")
end # def logger=

# Wrap SizedQueue#push with a timer metric.
def push(*args)
@metric_queue_write.time do
super(*args)
end
end # def push
alias_method :<<, :push
end

0 comments on commit 996c081

Please sign in to comment.