Skip to content

Commit

Permalink
Merge remote branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
fetep committed Mar 28, 2011
2 parents 3cc1c1c + 78fa0b5 commit 1f86bdc
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 108 deletions.
18 changes: 10 additions & 8 deletions Gemfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
source :rubygems

#gem "async_sinatra"
gem "bunny"
gem "filewatch"
gem "jls-grok", ">= 0.4.3"
gem "jruby-elasticsearch"
gem "bunny" # for amqp support
gem "uuidtools" # for naming amqp queues
gem "filewatch" # for file tailing
gem "jls-grok", "~> 0.4.3"
gem "jruby-elasticsearch", "~> 0.0.6"
gem "stomp" # for stomp protocol
gem "json"

#gem "async_sinatra"
#gem "rack"
gem "stomp"
gem "stompserver"
#gem "thin"
gem "uuidtools"

# For testing/dev
group :development do
gem "stompserver"
gem "spoon"
end
19 changes: 9 additions & 10 deletions Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,18 @@ namespace :package do
File.copy(file, target)
end

# Copy gems to the root of the build-jar dir
# TODO(sissel): Figure out how to package the gems. Maybe see how warbler
# does it.
#Dir.glob("vendor/bundle/jruby/1.8/gems/**/*") do |file|
#target = File.join("build-jar", file.gsub("build/", ""))
#mkdir_p File.dirname(target)
#puts "=> Copying #{file} => #{target}"
#File.copy(file, target)
#end

output = "logstash-#{LOGSTASH_VERSION}.jar"
sh "jar -cfe #{output} logstash.agent -C build-jar ."

# Learned how to do this mostly from here:
# http://blog.nicksieger.com/articles/2009/01/10/jruby-1-1-6-gems-in-a-jar
# Add bundled gems to the jar
sh "jar uf #{output} -C vendor/bundle/jruby/1.8 ."

# Add grok patterns
sh "jar -uf #{output} patterns/"

# Build jar index
sh "jar -i #{output}"
end # package:monolith:jar
end # monolith
Expand Down
76 changes: 55 additions & 21 deletions lib/logstash/search/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def initialize(settings={})

# See LogStash::Search;:Base#search
public
def search(q)
def search(q, async=false)
raise "No block given for search call." if !block_given?
if q.is_a?(String)
q = LogStash::Search::Query.parse(q)
Expand All @@ -36,36 +36,70 @@ def search(q)

@logger.info("ElasticSearch search: #{q.query_string}")
start_time = Time.now
searchreq.execute do |response|
result = LogStash::Search::Result.new
result.duration = Time.now - start_time

hits = response.hits rescue nil
# TODO(sissel): Dedup this into a method.
if async
searcreq.execute do |response|
result = search_response_to_result(response)
result.offset = q.offset
result.duration = Time.now - start_time
@logger.debug(["Got search results (async)",
{ :query => q.query_string, :duration => response.took.to_s,
:result_count => result.total }])

if hits.nil?
# return the whole object object as json as the error message for
# debugging later.
result.error_message = response
yield result
next # breaks from this callback
end

@logger.info(["Got search results",
return
else # not async
response = searchreq.execute!
result = search_response_to_result(response)
result.offset = q.offset
result.duration = Time.now - start_time
@logger.info(["Got search results (in blocking mode)",
{ :query => q.query_string, :duration => response.took.to_s,
:result_count => hits.totalHits}])
:result_count => result.total }])

# We want to yield a list of LogStash::Event objects.
hits.each do |hit|
result.events << LogStash::Event.new(hit.source)
if block_given?
yield result
else
return result
end
end # if async
return
end # def search

# Total hits this search could find if not limited
result.total = result.totalHits
result.offset = q.offset
private
def search_response_to_result(response)
result = LogStash::Search::Result.new

hits = response.hits rescue nil

if hits.nil?
# return the whole object object as json as the error message for
# debugging later.
result.error_message = response
yield result
next # breaks from this callback
end
end # def search

# We want to yield a list of LogStash::Event objects.
hits.each do |hit|
data = hit.getSource
# TODO(sissel): this conversion is only necessary because
# LogStash::Event#== invokes == on the data hash, and in in the
# test suite, we'll have a ruby array of tags compared against
# a java.util.ArrayList, which always fails.
# Possible fixes:
# - make Event#== smarter
# - or, convert in the test (not as awesome)
data["@tags"] = data["@tags"].to_a # convert java ArrayList to Ruby
result.events << LogStash::Event.new(data)
end

# Total hits this search could find if not limited
result.total = hits.totalHits
return result
end # def search_response_to_result

# See LogStash::Search;:Base#histogram
public
Expand All @@ -88,7 +122,7 @@ def histogram(q, field, interval=nil)
result = LogStash::Search::FacetResult.new
result.duration = Time.now - start_time

@logger.info(["Got search results",
@logger.debug(["Got search results",
{ :query => q.query_string, :duration => response.took.to_s }])
# TODO(sissel): Check for error.

Expand Down
4 changes: 3 additions & 1 deletion test/logstash/loadlibs.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
jarpath = File.join(File.dirname(__FILE__), "../../vendor/**/*.jar")
Dir[jarpath].each do |jar|
puts "Loading #{jar}"
if $DEBUG
puts "Loading #{jar}"
end
require jar
end

122 changes: 56 additions & 66 deletions test/logstash/outputs/test_elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,24 @@
require "logstash/testcase"
require "logstash/agent"
require "logstash/logging"
require "logstash/outputs/elasticsearch"
require "logstash/search/elasticsearch"
require "logstash/search/query"

require "spoon" # rubygem 'spoon' - implements posix_spawn via FFI

# For checking elasticsearch health
#require "net/http"
#require "uri"
#require "json"

module LibC
extend FFI::Library
ffi_lib FFI::Library:LIBC

posix_spawn


class TestOutputElasticSearch < LogStash::TestCase
ELASTICSEARCH_VERSION = "0.14.4"
class TestOutputElasticSearch < Test::Unit::TestCase
ELASTICSEARCH_VERSION = "0.15.2"

def setup
start_elasticsearch
@output = LogStash::Outputs::ElasticSearch.new({
:host => "localhost",
:index => "test",
:type => "foo",
:cluster => @cluster_name,
#@cluster = "logstash-test-1234"

@output = LogStash::Outputs::Elasticsearch.new({
"host" => ["localhost"],
"index" => ["test"],
"type" => ["foo"],
"cluster" => [@cluster],
})
@output.register
end # def setup
Expand All @@ -42,42 +33,42 @@ def start_elasticsearch
version = self.class::ELASTICSEARCH_VERSION
system("make -C #{File.dirname(__FILE__)}/../../setup/elasticsearch/ init-elasticsearch-#{version} wipe-elasticsearch-#{version} #{$DEBUG ? "" : "> /dev/null 2>&1"}")

1.upto(30) do
#1.upto(30) do
# Pick a random port
teardown if @es_pid
@port_http = (rand * 30000 + 20000).to_i
@port_tcp = (rand * 30000 + 20000).to_i
@cluster_name = "logstash-test-#{$$}"
puts "Starting ElasticSearch #{version}"
ENV["ESFLAGS"] = "-Des.http.port=#{@port_http} -Des.transport.tcp.port=#{@port_tcp} -Des.cluster.name=#{@cluster_name}"
@es_pid = Spoon.spawnp("make", "-C", "#{File.dirname(__FILE__)}/../../setup/elasticsearch/", "run-elasticsearch-#{version}")

# Assume it's up and happy
return

# Wait for elasticsearch to be ready.
#1.upto(30) do
#begin
#Net::HTTP.get(URI.parse("http://localhost:#{@port}/_status"))
#puts "ElasticSearch is ready..."
#return
#rescue => e
## TODO(sissel): Need to waitpid to see if ES has died and
## should immediately retry if it has.
#puts "ElasticSearch not yet ready... sleeping."
#sleep 2
#end
#end

#puts "ES did not start properly, trying again."
end # try a few times to launch ES on a random port.

raise "ElasticSearch failed to start or was otherwise not running properly?"
#@port_http = (rand * 30000 + 20000).to_i
#@port_tcp = (rand * 30000 + 20000).to_i
#end # try a few times to launch ES on a random port.

# Listen on random ports, I don't need them anyway.
@port_http = 0
@port_tcp = 0

teardown if @es_pid
@cluster = "logstash-test-#{$$}"

puts "Starting ElasticSearch #{version}"
@clusterflags = "-Des.cluster.name=#{@cluster}"

ENV["ESFLAGS"] = "-Des.http.port=#{@port_http} -Des.transport.tcp.port=#{@port_tcp} "
ENV["ESFLAGS"] += @clusterflags
ENV["ESFLAGS"] += " > /dev/null 2>&1" if !$DEBUG
cmd = ["make", "-C", "#{File.dirname(__FILE__)}/../../setup/elasticsearch/",]
cmd << "-s" if !$DEBUG
cmd << "run-elasticsearch-#{version}"
@es_pid = Spoon.spawnp(*cmd)

# Assume it's up and happy, or will be.
#raise "ElasticSearch failed to start or was otherwise not running properly?"
end # def start_elasticsearch

def teardown
# Kill the whole process group for elasticsearch
Process.kill("KILL", -1 * @es_pid) if !@es_pid.nil?
Process.kill("KILL", -1 * @es_pid) rescue nil
Process.kill("KILL", @es_pid) rescue nil

# TODO(sissel): Until I fix the way elasticsearch server is run,
# we'll use pkill...
system("pkill -9 -f 'java.*#{@clusterflags}.*Bootstrap'")
end # def teardown

def test_elasticsearch_basic
Expand All @@ -89,33 +80,32 @@ def test_elasticsearch_basic
end

# TODO(sissel): Need a way to hook when the agent is ready?
EventMachine.next_tick do
events.each do |e|
@outputs.receive
end
end # next_tick, push our events
events.each do |e|
puts "Pushing event: #{e}" if $DEBUG
@output.receive(e)
end

tries = 30
es = LogStash::Search::ElasticSearch.new(:cluster => @cluster)
loop do
es = LogStash::Search::ElasticSearch.new(:cluster_name => @cluster_name)
puts "Tries left: #{tries}" if $DEBUG
query = LogStash::Search::Query.new(:query_string => "*", :count => 5)
es.search(query) do |result|
es.search(query, async=false) do |result|
if events.size == result.events.size
puts "Found #{result.events.size} events, ready to verify!"
expected = events.clone
assert_equal(events.size, result.events.size)
events.each { |e| p :expect => e }
#events.each { |e| p :expect => e }
result.events.each do |event|
p :got => event
assert(expected.include?(event), "Found event in results that was not expected: #{event.inspect}\n\nExpected: #{events.map{ |a| a.inspect }.join("\n")}")
end
EventMachine.stop_event_loop
next # break out

return
else
tries -= 1
if tries <= 0
assert(false, "Gave up trying to query elasticsearch. Maybe we aren't indexing properly?")
EventMachine.stop_event_loop
return
end
end # if events.size == hits.size
end # es.search
Expand All @@ -125,10 +115,10 @@ def test_elasticsearch_basic
end # def test_elasticsearch_basic
end # class TestOutputElasticSearch

#class TestOutputElasticSearch0_15_1 < TestOutputElasticSearch
#ELASTICSEARCH_VERSION = self.name[/[0-9_]+/].gsub("_", ".")
#end # class TestOutputElasticSearch0_15_1

#class TestOutputElasticSearch0_13_1 < TestOutputElasticSearch
#ELASTICSEARCH_VERSION = self.name[/[0-9_]+/].gsub("_", ".")
#end # class TestOutputElasticSearch0_13_1
#
#class TestOutputElasticSearch0_12_0 < TestOutputElasticSearch
#ELASTICSEARCH_VERSION = self.name[/[0-9_]+/].gsub("_", ".")
#end # class TestOutputElasticSearch0_12_0
4 changes: 2 additions & 2 deletions test/run.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ def use(path)
end

use "logstash/test_syntax"
use "logstash/test_event"
use "logstash/filters/test_date"
use "logstash/filters/test_grep"
use "logstash/filters/test_multiline"
use "logstash/filters/test_grok"

use "logstash/outputs/test_elasticsearch"

skip "logstash/inputs/test_file"
skip "logstash/inputs/test_syslog"
skip "logstash/inputs/test_stomp"
skip "logstash/outputs/test_elasticsearch"

0 comments on commit 1f86bdc

Please sign in to comment.