From aadd54465ed92fcae6d4ef4da5fab637f4e5d128 Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Fri, 2 Nov 2012 23:41:07 -0700 Subject: [PATCH] - start hacking on a new pipeline implementation focusing on interruptable threads and clean shutdown semantics. (LOGSTASH-657) --- pl.rb | 91 ++++++++++++++++++++++++++++++++++++++++++++ pl2.rb | 118 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 209 insertions(+) create mode 100644 pl.rb create mode 100644 pl2.rb diff --git a/pl.rb b/pl.rb new file mode 100644 index 00000000000..768a4954e73 --- /dev/null +++ b/pl.rb @@ -0,0 +1,91 @@ +# pipeline tests + +$: << "lib" +require "logstash/config/file" +config = LogStash::Config::File.new(nil, ARGV[0]) +agent = LogStash::Agent.new +inputs, filters, outputs = agent.instance_eval { parse_config(config) } + +inputs.collect(&:register) +filters.collect(&:register) +outputs.collect(&:register) + +i2f = SizedQueue.new(16) +f2o = SizedQueue.new(16) +i2f = f2o if filters.empty? + +input_threads = inputs.collect do |i| + t = Thread.new do + begin + i.run(i2f) + rescue => e + puts :input => i.class, :exception => e + end + end + t[:name] = i.class + t +end + +#input_supervisor_thread = Thread.new do + #while true + #input_threads.collect(&:join) + #i2f << :shutdown + #end +#end + +filter_thread = Thread.new(filters) do |filters| + if filters.any? + event = i2f.pop + filters.each do |filter| + filter.filter(event) + end + f2o << event + end +end +filter_thread[:name] = "filterworker" + +output_thread = Thread.new do + begin + while true + event = f2o.pop + outputs.each do |output| + output.receive(event) + end + end + rescue => e + puts :output_thread => e + end +end +output_thread[:name] = "outputworker" + +def twait(thread) + begin + puts :waiting => thread[:name] + thread.join + puts :donewaiting => thread[:name] + rescue => e + puts thread => e + end +end + +def shutdown(input, filter, output) + input.each do |i| + i.raise("SHUTDOWN") + twait(i) + end + + #filter.raise("SHUTDOWN") + #twait(filter) + output.raise("SHUTDOWN") + twait(output) +end + +trap("INT") do + puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread) + exit 1 +end + +#[*input_threads, filter_thread, output_thread].collect(&:join) +sleep 30 + + diff --git a/pl2.rb b/pl2.rb new file mode 100644 index 00000000000..34aa7a2c334 --- /dev/null +++ b/pl2.rb @@ -0,0 +1,118 @@ +$: << "lib" +require "logstash/config/file" + +class Pipeline + class ShutdownSignal; end + + def initialize(configstr) + # hacks for now to parse a config string + config = LogStash::Config::File.new(nil, configstr) + agent = LogStash::Agent.new + @inputs, @filters, @outputs = agent.instance_eval { parse_config(config) } + + @inputs.collect(&:register) + @filters.collect(&:register) + @outputs.collect(&:register) + + @input_to_filter = SizedQueue(16) + @filter_to_output = SizedQueue(16) + + # If no filters, pipe inputs to outputs + if @filters.empty? + input_to_filter = filter_to_output + end + end + + def run + # one thread per input + @input_threads = @inputs.collect do |input| + Thread.new(input) do |input| + inputworker(input) + end + end + + # one filterworker thread + #@filter_threads = @filters.collect do |input + # TODO(sissel): THIS IS WHERE I STOPPED WORKING + + # one outputworker thread + + # Now monitor input threads state + # if all inputs are terminated, send shutdown signal to @input_to_filter + end + + def inputworker(plugin) + begin + plugin.run(@input_to_filter) + rescue ShutdownSignal + plugin.teardown + rescue => e + @logger.error("Exception in plugin #{plugin.class}, restarting plugin.", + "plugin" => plugin.inspect, "exception" => e) + plugin.teardown + retry + end + end # def + + def filterworker + begin + while true + event << @input_to_filter + break if event == :shutdown + @filters.each do |filter| + filter.filter(event) + end + next if event.cancelled? + @filter_to_output << event + end + rescue => e + @logger.error("Exception in plugin #{plugin.class}", + "plugin" => plugin.inspect, "exception" => e) + end + @filters.each(&:teardown) + end # def filterworker + + def outputworker + begin + while true + event << @filter_to_output + break if event == :shutdown + @outputs.each do |output| + output.receive(event) + end + end + rescue => e + @logger.error("Exception in plugin #{plugin.class}", + "plugin" => plugin.inspect, "exception" => e) + end + @outputs.each(&:teardown) + end # def filterworker +end # class Pipeline + +def twait(thread) + begin + puts :waiting => thread[:name] + thread.join + puts :donewaiting => thread[:name] + rescue => e + puts thread => e + end +end + +def shutdown(input, filter, output) + input.each do |i| + i.raise("SHUTDOWN") + end + + #filter.raise("SHUTDOWN") + #twait(filter) + output.raise("SHUTDOWN") + twait(output) +end + +trap("INT") do + puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread) + exit 1 +end + +