Skip to content

Commit

Permalink
- start hacking on a new pipeline implementation focusing on
Browse files Browse the repository at this point in the history
  interruptable threads and clean shutdown semantics.
  (LOGSTASH-657)
  • Loading branch information
jordansissel committed Nov 3, 2012
1 parent 4b05a0b commit aadd544
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 0 deletions.
91 changes: 91 additions & 0 deletions pl.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# pipeline tests

$: << "lib"
require "logstash/config/file"
config = LogStash::Config::File.new(nil, ARGV[0])
agent = LogStash::Agent.new
inputs, filters, outputs = agent.instance_eval { parse_config(config) }

inputs.collect(&:register)
filters.collect(&:register)
outputs.collect(&:register)

i2f = SizedQueue.new(16)
f2o = SizedQueue.new(16)
i2f = f2o if filters.empty?

input_threads = inputs.collect do |i|
t = Thread.new do
begin
i.run(i2f)
rescue => e
puts :input => i.class, :exception => e
end
end
t[:name] = i.class
t
end

#input_supervisor_thread = Thread.new do
#while true
#input_threads.collect(&:join)
#i2f << :shutdown
#end
#end

filter_thread = Thread.new(filters) do |filters|
if filters.any?
event = i2f.pop
filters.each do |filter|
filter.filter(event)
end
f2o << event
end
end
filter_thread[:name] = "filterworker"

output_thread = Thread.new do
begin
while true
event = f2o.pop
outputs.each do |output|
output.receive(event)
end
end
rescue => e
puts :output_thread => e
end
end
output_thread[:name] = "outputworker"

def twait(thread)
begin
puts :waiting => thread[:name]
thread.join
puts :donewaiting => thread[:name]
rescue => e
puts thread => e
end
end

def shutdown(input, filter, output)
input.each do |i|
i.raise("SHUTDOWN")
twait(i)
end

#filter.raise("SHUTDOWN")
#twait(filter)
output.raise("SHUTDOWN")
twait(output)
end

trap("INT") do
puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread)
exit 1
end

#[*input_threads, filter_thread, output_thread].collect(&:join)
sleep 30


118 changes: 118 additions & 0 deletions pl2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
$: << "lib"
require "logstash/config/file"

class Pipeline
class ShutdownSignal; end

def initialize(configstr)
# hacks for now to parse a config string
config = LogStash::Config::File.new(nil, configstr)
agent = LogStash::Agent.new
@inputs, @filters, @outputs = agent.instance_eval { parse_config(config) }

@inputs.collect(&:register)
@filters.collect(&:register)
@outputs.collect(&:register)

@input_to_filter = SizedQueue(16)
@filter_to_output = SizedQueue(16)

# If no filters, pipe inputs to outputs
if @filters.empty?
input_to_filter = filter_to_output
end
end

def run
# one thread per input
@input_threads = @inputs.collect do |input|
Thread.new(input) do |input|
inputworker(input)
end
end

# one filterworker thread
#@filter_threads = @filters.collect do |input
# TODO(sissel): THIS IS WHERE I STOPPED WORKING

# one outputworker thread

# Now monitor input threads state
# if all inputs are terminated, send shutdown signal to @input_to_filter
end

def inputworker(plugin)
begin
plugin.run(@input_to_filter)
rescue ShutdownSignal
plugin.teardown
rescue => e
@logger.error("Exception in plugin #{plugin.class}, restarting plugin.",
"plugin" => plugin.inspect, "exception" => e)
plugin.teardown
retry
end
end # def

def filterworker
begin
while true
event << @input_to_filter
break if event == :shutdown
@filters.each do |filter|
filter.filter(event)
end
next if event.cancelled?
@filter_to_output << event
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
"plugin" => plugin.inspect, "exception" => e)
end
@filters.each(&:teardown)
end # def filterworker

def outputworker
begin
while true
event << @filter_to_output
break if event == :shutdown
@outputs.each do |output|
output.receive(event)
end
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
"plugin" => plugin.inspect, "exception" => e)
end
@outputs.each(&:teardown)
end # def filterworker
end # class Pipeline

def twait(thread)
begin
puts :waiting => thread[:name]
thread.join
puts :donewaiting => thread[:name]
rescue => e
puts thread => e
end
end

def shutdown(input, filter, output)
input.each do |i|
i.raise("SHUTDOWN")
end

#filter.raise("SHUTDOWN")
#twait(filter)
output.raise("SHUTDOWN")
twait(output)
end

trap("INT") do
puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread)
exit 1
end


0 comments on commit aadd544

Please sign in to comment.