Skip to content

Commit

Permalink
rework the shutdown system a bit; end stdin input on eof
Browse files Browse the repository at this point in the history
  • Loading branch information
fetep committed Jun 22, 2012
1 parent 6f3cc64 commit 6d2287d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
20 changes: 12 additions & 8 deletions lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,12 @@ def run_with_config(config)
end
end

# TODO(sissel): Monitor what's going on? Sleep forever? what?
while sleep 5
@logger.info("heartbeat")
while sleep(2)
if @plugins.values.count { |p| p.alive? } == 0
@logger.warn("no plugins running, shutting down")
shutdown
end
@logger.debug("heartbeat")
end
end # def run_with_config

Expand Down Expand Up @@ -673,6 +676,7 @@ def run_input(input, queue)
begin
input.run(queue)
done = true
input.finished
rescue => e
@logger.warn("Input thread exception", :plugin => input,
:exception => e, :backtrace => e.backtrace)
Expand Down Expand Up @@ -715,6 +719,7 @@ def run_output(output, queue)
while event = queue.pop do
@logger.debug("Sending event", :target => output)
output.handle(event)
break if output.finished?
end
rescue Exception => e
@logger.warn("Output thread exception", :plugin => output,
Expand All @@ -738,17 +743,16 @@ def shutdown_if_none_running(pluginclass, queue=nil)
# If none are running, start the shutdown sequence and
# send the 'shutdown' event down the pipeline.
remaining = @plugins.count do |plugin, thread|
plugin.is_a?(pluginclass) and plugin.running?
plugin.is_a?(pluginclass) and plugin.running? and thread.alive?
end
@logger.debug("Plugins still running", :type => pluginclass,
:remaining => remaining)

if remaining == 0
@logger.debug("All #{pluginclass} finished. Shutting down.")
@logger.warn("All #{pluginclass} finished. Shutting down.")

# Send 'shutdown' to the filters.
queue << LogStash::SHUTDOWN if !queue.nil?
shutdown
# Send 'shutdown' event to other running plugins
queue << LogStash::SHUTDOWN unless queue.nil?
end # if remaining == 0
end # @plugins_mutex.synchronize
end # def shutdown_if_none_running
Expand Down
6 changes: 5 additions & 1 deletion lib/logstash/inputs/stdin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ def register

def run(queue)
loop do
e = to_event($stdin.readline.chomp, "stdin://#{@host}/")
begin
e = to_event($stdin.readline.chomp, "stdin://#{@host}/")
rescue EOFError => ex
break
end
if e
queue << e
end
Expand Down

0 comments on commit 6d2287d

Please sign in to comment.