Skip to content

Commit

Permalink
More pipeline fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jordansissel committed Feb 7, 2013
1 parent 5ef7896 commit 8c6b94d
Showing 1 changed file with 57 additions and 51 deletions.
108 changes: 57 additions & 51 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
require "logstash/config/file"
require "logstash/agent" # only needed for now for parse_config
require "logstash/namespace"
require "thread" # stdlib
require "stud/trap"

class LogStash::Pipeline
class ShutdownSignal; end
Expand All @@ -11,20 +13,27 @@ def initialize(configstr)
agent = LogStash::Agent.new
@inputs, @filters, @outputs = agent.instance_eval { parse_config(config) }

@inputs.each(&:register)
@filters.each(&:register)
@outputs.each(&:register)

@input_to_filter = SizedQueue(16)
@filter_to_output = SizedQueue(16)
@input_to_filter = SizedQueue.new(20)
@filter_to_output = SizedQueue.new(20)

# If no filters, pipe inputs directly to outputs
if @filters.empty?
input_to_filter = filter_to_output
@input_to_filter = @filter_to_output
end

logger = Cabin::Channel.get
(@inputs + @filters + @outputs).each do |plugin|
plugin.logger = logger
end

@inputs.each(&:register)
@filters.each(&:register)
@outputs.each(&:register)
end

def run
start = Time.now

# one thread per input
@input_threads = @inputs.collect do |input|
Thread.new(input) do |input|
Expand All @@ -38,9 +47,22 @@ def run

# one outputworker thread

# Now monitor input threads state
# if all inputs are terminated, send shutdown signal to @input_to_filter
end
@output_thread = Thread.new do
outputworker
end

@input_threads.each(&:join)

# All input plugins have completed, send a shutdown signal.
duration = Time.now - start
puts "Duration: #{duration}"

@input_to_filter.push(ShutdownSignal)
@output_thread.join

# exit code
return 0
end # def run

def inputworker(plugin)
begin
Expand All @@ -53,12 +75,12 @@ def inputworker(plugin)
plugin.teardown
retry
end
end # def
end # def inputworker

def filterworker
begin
while true
event << @input_to_filter
event = @input_to_filter.pop
break if event == ShutdownSignal

# Apply filters, in order, to the event.
Expand All @@ -67,57 +89,41 @@ def filterworker
end
next if event.cancelled?

@filter_to_output << event
@filter_to_output.push(event)
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
"plugin" => plugin.inspect, "exception" => e)
end

@filters.each(&:teardown)
end # def filterworker

def outputworker
begin
while true
event << @filter_to_output
break if event == ShutdownSignal
while true
event = @filter_to_output.pop
break if event == ShutdownSignal

@outputs.each do |output|
@outputs.each do |output|
begin
output.receive(event)
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
"plugin" => plugin.inspect, "exception" => e)
end
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
"plugin" => plugin.inspect, "exception" => e)
end
end # @outputs.each
end # while true
@outputs.each(&:teardown)
end # def filterworker
end # class Pipeline

def twait(thread)
begin
puts :waiting => thread[:name]
thread.join
puts :donewaiting => thread[:name]
rescue => e
puts thread => e
end
end

def shutdown(input, filter, output)
input.each do |i|
i.raise("SHUTDOWN")
end

#filter.raise("SHUTDOWN")
#twait(filter)
output.raise("SHUTDOWN")
twait(output)
end

trap("INT") do
puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread)
exit 1
end


# Shutdown this pipeline.
#
# This method is intended to be called from another thread
def shutdown
@input_threads.each do |thread|
# Interrupt all inputs
thread.raise(ShutdownSignal.new)
end
@filter_to_output.push(ShutdownSignal)
end # def shutdown
end # class Pipeline

0 comments on commit 8c6b94d

Please sign in to comment.