Skip to content

Commit

Permalink
- split Agent#run into #run and #run_with_config for easier testing.
Browse files Browse the repository at this point in the history
- yield in run_with_config if we get a block (this is necessary for
  testing so we can determine when the agent is done setting up
  inputs/outputs/etc)
- Allow LogStash::Config::File.new to take a string that is the value
  for the config text. Used for testing.
- Fix outputs/internal to work with the new jruby/thread model
- fix the file input tests in jrubyland

Test status for inputs/test_file.rb
  Finished in 2.3 seconds.
  1 tests, 32 assertions, 0 failures, 0 errors
  • Loading branch information
jordansissel committed Mar 28, 2011
1 parent 734bc3f commit b4ff4ce
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 56 deletions.
10 changes: 9 additions & 1 deletion lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,15 @@ def run
raise "Option parsing failed. See error log."
end


configure

# Load the config file
config = LogStash::Config::File.new(@config_file)

run_with_config(config)
end # def run

def run_with_config(config)
config.parse do |plugin|
# 'plugin' is a has containing:
# :type => the base class of the plugin (LogStash::Inputs::Base, etc)
Expand Down Expand Up @@ -324,6 +328,10 @@ def run
end # Thread.new
end # @outputs.each

# yield to a block in case someone's waiting for us to be done setting up
# like tests, etc.
yield if block_given?

while sleep 5
end
end # def run
Expand Down
19 changes: 15 additions & 4 deletions lib/logstash/config/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,25 @@

class LogStash::Config::File
public
def initialize(file)
@file = file
end
def initialize(path=nil, string=nil)
@path = path
@string = string

if (path.nil? and string.nil?) or (!path.nil? and !string.nil?)
raise "Must give path or string, not both or neither"
end
end # def initialize

public
def parse
grammar = LogStash::Config::Grammar.new
grammar.parse(File.new(@file).read)

if @string.nil?
grammar.parse(File.new(@path).read)
else
grammar.parse(@string)
end

@config = grammar.config

registry = LogStash::Config::Registry::registry
Expand Down
29 changes: 11 additions & 18 deletions lib/logstash/outputs/internal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,38 +2,31 @@
require "logstash/outputs/base"

class LogStash::Outputs::Internal < LogStash::Outputs::Base

config_name "internal"

public
def initialize(url, config={}, &block)
super
@callback = block
end # def initialize
attr_accessor :callback

public
def register
@logger.info("Registering output #{@url}")
@logger.info("Registering internal output (for testing!)")
@callbacks ||= []
end # def register

public
def receive(event)
if !@callback
if @callbacks.empty?
@logger.error("No callback for output #{@url}, cannot receive")
return
end
@callback.call(event)
end # def event

# Set the callback by passing a block of code
public
def callback(&block)
@callback = block
end
@callbacks.each do |callback|
callback.call(event)
end
end # def event

# Set the callback by passing a proc object
public
def callback=(proc_block)
@callback = proc_block
def subscribe(&block)
@callbacks ||= []
@callbacks << block
end
end # class LogStash::Outputs::Internal
94 changes: 61 additions & 33 deletions test/logstash/inputs/test_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,82 @@
require 'rubygems'
$:.unshift File.dirname(__FILE__) + "/../../../lib"
$:.unshift File.dirname(__FILE__) + "/../../"

require "test/unit"
require "tempfile"
require "logstash/testcase"
require "thread"
require "logstash/loadlibs"
require "logstash/agent"
require "logstash/logging"
require "logstash/util"
require "socket"


# TODO(sissel): refactor this so we can more easily specify tests.
class TestInputFile < LogStash::TestCase
def em_setup
class TestInputFile < Test::Unit::TestCase
def setup
@tmpfile = Tempfile.new(self.class.name)
@hostname = Socket.gethostname
@type = "logstash-test"

@agent = LogStash::Agent.new
config = LogStash::Config::File.new(path=nil, string=<<-CONFIG)
input {
file {
path => "#{@tmpfile.path}"
type => "#{@type}"
}
}
config = {
"inputs" => {
@type => [
"file://#{@tmpfile.path}"
],
},
"outputs" => [
"internal:///"
]
} # config
output {
internal { }
}
CONFIG

waitqueue = Queue.new

Thread.new do
@agent.run_with_config(config) do
waitqueue << :ready
end
end

super(config)
end
# Wait for the agent to be ready.
waitqueue.pop
@output = @agent.outputs.first
end # def setup

def test_simple
data = [ "hello", "world", "hello world 1 2 3 4", "1", "2", "3", "4", "5" ]
remaining = data.size
EventMachine.run do
em_setup
expect_data = data.clone
@output.subscribe do |event|
expect_message = expect_data.shift
assert_equal(expect_message, event.message)
assert_equal("file://#{@hostname}#{@tmpfile.path}", event.source)
assert_equal(@type, event.type, "type")
assert_equal([], event.tags, "tags should be empty")

# Done testing if we run out of data.
@agent.stop if expect_data.size == 0
end
expect_data = data.clone

queue = Queue.new
@output.subscribe do |event|
queue << event
end

# Write to the file periodically
timer = EM::PeriodicTimer.new(0.2) do
# Write to the file periodically
Thread.new do
LogStash::Util.set_thread_name("#{__FILE__} - periodic writer")
loop do
out = data.shift((rand * 3).to_i + 1).join("\n")
@tmpfile.puts out
@tmpfile.flush
timer.cancel if data.length == 0
break if data.length == 0
end # loop
end # timer thread

loop do
event = queue.pop
expect_message = expect_data.shift
assert_equal(expect_message, event.message)
assert_equal("file://#{@hostname}#{@tmpfile.path}", event.source)
assert_equal(@type, event.type, "type")
assert_equal([], event.tags, "tags should be empty")

# Done testing if we run out of data.
if expect_data.size == 0
@agent.stop
break
end
end
end # def test_simple
Expand Down

0 comments on commit b4ff4ce

Please sign in to comment.