Skip to content

Commit

Permalink
Merge pull request elastic#1254 from colinsurprenant/multiline_specs
Browse files Browse the repository at this point in the history
multiline specs
  • Loading branch information
jordansissel committed Apr 11, 2014
2 parents 3a95a44 + 94a1c41 commit f1cf98b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 52 deletions.
34 changes: 23 additions & 11 deletions lib/logstash/filters/multiline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require "set"
#
# This filter will collapse multiline messages from a single source into one Logstash event.
#
#
# The original goal of this filter was to allow joining of multi-line messages
# from files into a single event. For example - joining java exception and
# stacktrace messages into a single event.
Expand All @@ -19,20 +19,20 @@
# what => "previous" or "next"
# }
# }
#
#
# The `pattern` should be a regexp which matches what you believe to be an indicator
# that the field is part of an event consisting of multiple lines of log data.
#
# The `what` must be "previous" or "next" and indicates the relation
# to the multi-line event.
#
# The `negate` can be "true" or "false" (defaults to false). If "true", a
# The `negate` can be "true" or "false" (defaults to false). If "true", a
# message not matching the pattern will constitute a match of the multiline
# filter and the `what` will be applied. (vice-versa is also true)
#
# For example, Java stack traces are multiline and usually have the message
# starting at the far-left, with each subsequent line indented. Do this:
#
#
# filter {
# multiline {
# type => "somefiletype"
Expand All @@ -52,7 +52,7 @@
# what => "next"
# }
# }
#
#
# This says that any line ending with a backslash should be combined with the
# following line.
#
Expand All @@ -69,7 +69,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base

# Negate the regexp pattern ('if not matched')
config :negate, :validate => :boolean, :default => false

# The stream identity is how the multiline filter determines which stream an
# event belongs to. This is generally used for differentiating, say, events
# coming from multiple files in the same file input, or multiple connections
Expand All @@ -83,7 +83,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# may have occurred between the old and new connection. To solve this use
# case, you can use "%{@source_host}.%{@type}" instead.
config :stream_identity , :validate => :string, :default => "%{host}.%{path}.%{type}"

# Logstash ships by default with a bunch of patterns, so you don't
# necessarily need to define this yourself unless you are adding additional
# patterns.
Expand All @@ -97,6 +97,9 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base
# NUMBER \d+
config :patterns_dir, :validate => :array, :default => []

# for debugging & testing purposes, do not use in production. allows periodic flushing of pending events
config :enable_flush, :validate => :boolean, :default => false

# Detect if we are running from a jarfile, pick the right path.
@@patterns_path = Set.new
if __FILE__ =~ /file:\/.*\.jar!.*/
Expand Down Expand Up @@ -215,8 +218,7 @@ def filter(event)
end # case @what

if !event.cancelled?
event["message"] = event["message"].join("\n") if event["message"].is_a?(Array)
event["@timestamp"] = event["@timestamp"].first if event["@timestamp"].is_a?(Array)
collapse_event!(event)
filter_matched(event) if match
end
end # def filter
Expand All @@ -225,13 +227,23 @@ def filter(event)
#
# Note: flush is disabled now; it is preferable to use the multiline codec.
public
def __flush
def flush
return [] unless @enable_flush

events = []
@pending.each do |key, value|
value.uncancel
events << value
events << collapse_event!(value)
end
@pending.clear
return events
end # def flush

private

def collapse_event!(event)
event["message"] = event["message"].join("\n") if event["message"].is_a?(Array)
event["@timestamp"] = event["@timestamp"].first if event["@timestamp"].is_a?(Array)
event
end
end # class LogStash::Filters::Multiline
83 changes: 43 additions & 40 deletions spec/filters/multiline.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
# encoding: utf-8

require "test_utils"
require "logstash/filters/multiline"

puts "MULTILINE FILTER TEST DISABLED"
describe LogStash::Filters::Multiline, :if => false do
describe LogStash::Filters::Multiline do

extend LogStash::RSpec

describe "simple multiline" do
config <<-CONFIG
filter {
multiline {
enable_flush => true
pattern => "^\\s"
what => previous
}
}
CONFIG

sample [ "hello world", " second line", "another first line" ] do
p subject.to_hash
insist { subject.length } == 2
expect(subject).to be_a(Array)
insist { subject.size } == 2
insist { subject[0]["message"] } == "hello world\n second line"
insist { subject[1]["message"] } == "another first line"
end
Expand All @@ -28,6 +30,7 @@
config <<-CONFIG
filter {
multiline {
enable_flush => true
pattern => "^%{NUMBER} %{TIME}"
negate => true
what => previous
Expand All @@ -36,60 +39,54 @@
CONFIG

sample [ "120913 12:04:33 first line", "second line", "third line" ] do
insist { subject.length } == 1
insist { subject[0]["message"] } == "120913 12:04:33 first line\nsecond line\nthird line"
insist { subject["message"] } == "120913 12:04:33 first line\nsecond line\nthird line"
end
end

describe "multiline safety among multiple concurrent streams" do
config <<-CONFIG
filter {
multiline {
enable_flush => true
pattern => "^\\s"
what => previous
}
}
CONFIG

multiline_event = [
"hello world",
]

count = 20
stream_count = 2
id = 0
eventstream = count.times.collect do |i|
stream = "stream#{i % stream_count}"
(
[ "hello world #{stream}" ] \
+ rand(5).times.collect { |n| id += 1; " extra line #{n} in #{stream} event #{id}" }
) .collect do |line|
LogStash::Event.new("message" => line,
"host" => stream, "type" => stream,
"event" => i)
count = 50
stream_count = 3

# first make sure to have starting lines for all streams
eventstream = stream_count.times.map do |i|
stream = "stream#{i}"
lines = [LogStash::Event.new("message" => "hello world #{stream}", "host" => stream, "type" => stream)]
lines += rand(5).times.map do |n|
LogStash::Event.new("message" => " extra line in #{stream}", "host" => stream, "type" => stream)
end
end

alllines = eventstream.flatten

# Take whole events and mix them with other events (maintain order)
# This simulates a mixing of multiple streams being received
# and processed. It requires that the multiline filter correctly partition
# by stream_identity
concurrent_stream = eventstream.flatten.count.times.collect do
index = rand(eventstream.count)
event = eventstream[index].shift
eventstream.delete_at(index) if eventstream[index].empty?
event
# them add starting lines for random stream with sublines also for random stream
eventstream += (count - stream_count).times.map do |i|
stream = "stream#{rand(stream_count)}"
lines = [LogStash::Event.new("message" => "hello world #{stream}", "host" => stream, "type" => stream)]
lines += rand(5).times.map do |n|
stream = "stream#{rand(stream_count)}"
LogStash::Event.new("message" => " extra line in #{stream}", "host" => stream, "type" => stream)
end
end

sample concurrent_stream do
insist { subject.count } == count
events = eventstream.flatten.map{|event| event.to_hash}

sample events do
expect(subject).to be_a(Array)
insist { subject.size } == count

subject.each_with_index do |event, i|
#puts "#{i}/#{event["event"]}: #{event.to_json}"
#insist { event.type } == stream
#insist { event.source } == stream
insist { event["type"] == event["host"] } == true
stream = event["type"]
insist { event["message"].split("\n").first } =~ /hello world /
insist { event["message"].scan(/stream\d/).all?{|word| word == stream} } == true
end
end
end
Expand All @@ -101,6 +98,7 @@
add_tag => "dummy"
}
multiline {
enable_flush => true
add_tag => [ "nope" ]
remove_tag => "dummy"
add_field => [ "dummy2", "value" ]
Expand All @@ -111,9 +109,14 @@
CONFIG

sample [ "120913 12:04:33 first line", "120913 12:04:33 second line" ] do
expect(subject).to be_a(Array)
insist { subject.size } == 2

subject.each do |s|
insist { s.tags.find_index("nope").nil? && s.tags.find_index("dummy") != nil && !s.fields.has_key?("dummy2") } == true
insist { s["tags"].include?("nope") } == false
insist { s["tags"].include?("dummy") } == true
insist { s.include?("dummy2") } == false
end
end
end
end
end
5 changes: 4 additions & 1 deletion spec/test_utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def sample(sample_event, &block)
results = []
count = 0
pipeline.instance_eval { @filters.each(&:register) }

event.each do |e|
extra = []
pipeline.filter(e) do |new_event|
Expand All @@ -96,8 +97,10 @@ def sample(sample_event, &block)
results += extra.reject(&:cancelled?)
end

pipeline.instance_eval {@filters.each {|f| results += f.flush if f.respond_to?(:flush)}}

# TODO(sissel): pipeline flush needs to be implemented.
#results += pipeline.flush
# results += pipeline.flush
next results
end

Expand Down

0 comments on commit f1cf98b

Please sign in to comment.