Skip to content

Commit

Permalink
Add new pipeline WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
jordansissel committed Feb 5, 2013
1 parent 120c5d3 commit 41e41fe
Showing 1 changed file with 123 additions and 0 deletions.
123 changes: 123 additions & 0 deletions lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
require "logstash/config/file"
require "logstash/agent" # only needed for now for parse_config
require "logstash/namespace"

class LogStash::Pipeline
class ShutdownSignal; end

def initialize(configstr)
# hacks for now to parse a config string
config = LogStash::Config::File.new(nil, configstr)
agent = LogStash::Agent.new
@inputs, @filters, @outputs = agent.instance_eval { parse_config(config) }

@inputs.each(&:register)
@filters.each(&:register)
@outputs.each(&:register)

@input_to_filter = SizedQueue(16)
@filter_to_output = SizedQueue(16)

# If no filters, pipe inputs directly to outputs
if @filters.empty?
input_to_filter = filter_to_output
end
end

def run
# one thread per input
@input_threads = @inputs.collect do |input|
Thread.new(input) do |input|
inputworker(input)
end
end

# one filterworker thread
#@filter_threads = @filters.collect do |input
# TODO(sissel): THIS IS WHERE I STOPPED WORKING

# one outputworker thread

# Now monitor input threads state
# if all inputs are terminated, send shutdown signal to @input_to_filter
end

def inputworker(plugin)
begin
plugin.run(@input_to_filter)
rescue ShutdownSignal
plugin.teardown
rescue => e
@logger.error("Exception in plugin #{plugin.class}, restarting plugin.",
"plugin" => plugin.inspect, "exception" => e)
plugin.teardown
retry
end
end # def

def filterworker
begin
while true
event << @input_to_filter
break if event == ShutdownSignal

# Apply filters, in order, to the event.
@filters.each do |filter|
filter.execute(event)
end
next if event.cancelled?

@filter_to_output << event
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
"plugin" => plugin.inspect, "exception" => e)
end
@filters.each(&:teardown)
end # def filterworker

def outputworker
begin
while true
event << @filter_to_output
break if event == ShutdownSignal

@outputs.each do |output|
output.receive(event)
end
end
rescue => e
@logger.error("Exception in plugin #{plugin.class}",
"plugin" => plugin.inspect, "exception" => e)
end
@outputs.each(&:teardown)
end # def filterworker
end # class Pipeline

def twait(thread)
begin
puts :waiting => thread[:name]
thread.join
puts :donewaiting => thread[:name]
rescue => e
puts thread => e
end
end

def shutdown(input, filter, output)
input.each do |i|
i.raise("SHUTDOWN")
end

#filter.raise("SHUTDOWN")
#twait(filter)
output.raise("SHUTDOWN")
twait(output)
end

trap("INT") do
puts "SIGINT"; shutdown(input_threads, filter_thread, output_thread)
exit 1
end


0 comments on commit 41e41fe

Please sign in to comment.