diff --git a/lib/logstash/filters/multiline.rb b/lib/logstash/filters/multiline.rb index 55ad67b0330..2d6bf06a814 100644 --- a/lib/logstash/filters/multiline.rb +++ b/lib/logstash/filters/multiline.rb @@ -4,7 +4,7 @@ require "set" # # This filter will collapse multiline messages from a single source into one Logstash event. -# +# # The original goal of this filter was to allow joining of multi-line messages # from files into a single event. For example - joining java exception and # stacktrace messages into a single event. @@ -19,20 +19,20 @@ # what => "previous" or "next" # } # } -# +# # The `pattern` should be a regexp which matches what you believe to be an indicator # that the field is part of an event consisting of multiple lines of log data. # # The `what` must be "previous" or "next" and indicates the relation # to the multi-line event. # -# The `negate` can be "true" or "false" (defaults to false). If "true", a +# The `negate` can be "true" or "false" (defaults to false). If "true", a # message not matching the pattern will constitute a match of the multiline # filter and the `what` will be applied. (vice-versa is also true) # # For example, Java stack traces are multiline and usually have the message # starting at the far-left, with each subsequent line indented. Do this: -# +# # filter { # multiline { # type => "somefiletype" @@ -52,7 +52,7 @@ # what => "next" # } # } -# +# # This says that any line ending with a backslash should be combined with the # following line. # @@ -69,7 +69,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base # Negate the regexp pattern ('if not matched') config :negate, :validate => :boolean, :default => false - + # The stream identity is how the multiline filter determines which stream an # event belongs to. This is generally used for differentiating, say, events # coming from multiple files in the same file input, or multiple connections @@ -83,7 +83,7 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base # may have occurred between the old and new connection. To solve this use # case, you can use "%{@source_host}.%{@type}" instead. config :stream_identity , :validate => :string, :default => "%{host}.%{path}.%{type}" - + # Logstash ships by default with a bunch of patterns, so you don't # necessarily need to define this yourself unless you are adding additional # patterns. @@ -97,6 +97,9 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base # NUMBER \d+ config :patterns_dir, :validate => :array, :default => [] + # for debugging & testing purposes, do not use in production. allows periodic flushing of pending events + config :enable_flush, :validate => :boolean, :default => false + # Detect if we are running from a jarfile, pick the right path. @@patterns_path = Set.new if __FILE__ =~ /file:\/.*\.jar!.*/ @@ -215,8 +218,7 @@ def filter(event) end # case @what if !event.cancelled? - event["message"] = event["message"].join("\n") if event["message"].is_a?(Array) - event["@timestamp"] = event["@timestamp"].first if event["@timestamp"].is_a?(Array) + collapse_event!(event) filter_matched(event) if match end end # def filter @@ -225,13 +227,23 @@ def filter(event) # # Note: flush is disabled now; it is preferable to use the multiline codec. public - def __flush + def flush + return [] unless @enable_flush + events = [] @pending.each do |key, value| value.uncancel - events << value + events << collapse_event!(value) end @pending.clear return events end # def flush + + private + + def collapse_event!(event) + event["message"] = event["message"].join("\n") if event["message"].is_a?(Array) + event["@timestamp"] = event["@timestamp"].first if event["@timestamp"].is_a?(Array) + event + end end # class LogStash::Filters::Multiline diff --git a/spec/filters/multiline.rb b/spec/filters/multiline.rb index d4fb15b5314..25fd4a0eb99 100644 --- a/spec/filters/multiline.rb +++ b/spec/filters/multiline.rb @@ -1,8 +1,9 @@ +# encoding: utf-8 + require "test_utils" require "logstash/filters/multiline" -puts "MULTILINE FILTER TEST DISABLED" -describe LogStash::Filters::Multiline, :if => false do +describe LogStash::Filters::Multiline do extend LogStash::RSpec @@ -10,6 +11,7 @@ config <<-CONFIG filter { multiline { + enable_flush => true pattern => "^\\s" what => previous } @@ -17,8 +19,8 @@ CONFIG sample [ "hello world", " second line", "another first line" ] do - p subject.to_hash - insist { subject.length } == 2 + expect(subject).to be_a(Array) + insist { subject.size } == 2 insist { subject[0]["message"] } == "hello world\n second line" insist { subject[1]["message"] } == "another first line" end @@ -28,6 +30,7 @@ config <<-CONFIG filter { multiline { + enable_flush => true pattern => "^%{NUMBER} %{TIME}" negate => true what => previous @@ -36,8 +39,7 @@ CONFIG sample [ "120913 12:04:33 first line", "second line", "third line" ] do - insist { subject.length } == 1 - insist { subject[0]["message"] } == "120913 12:04:33 first line\nsecond line\nthird line" + insist { subject["message"] } == "120913 12:04:33 first line\nsecond line\nthird line" end end @@ -45,51 +47,46 @@ config <<-CONFIG filter { multiline { + enable_flush => true pattern => "^\\s" what => previous } } CONFIG - multiline_event = [ - "hello world", - ] - - count = 20 - stream_count = 2 - id = 0 - eventstream = count.times.collect do |i| - stream = "stream#{i % stream_count}" - ( - [ "hello world #{stream}" ] \ - + rand(5).times.collect { |n| id += 1; " extra line #{n} in #{stream} event #{id}" } - ) .collect do |line| - LogStash::Event.new("message" => line, - "host" => stream, "type" => stream, - "event" => i) + count = 50 + stream_count = 3 + + # first make sure to have starting lines for all streams + eventstream = stream_count.times.map do |i| + stream = "stream#{i}" + lines = [LogStash::Event.new("message" => "hello world #{stream}", "host" => stream, "type" => stream)] + lines += rand(5).times.map do |n| + LogStash::Event.new("message" => " extra line in #{stream}", "host" => stream, "type" => stream) end end - alllines = eventstream.flatten - - # Take whole events and mix them with other events (maintain order) - # This simulates a mixing of multiple streams being received - # and processed. It requires that the multiline filter correctly partition - # by stream_identity - concurrent_stream = eventstream.flatten.count.times.collect do - index = rand(eventstream.count) - event = eventstream[index].shift - eventstream.delete_at(index) if eventstream[index].empty? - event + # them add starting lines for random stream with sublines also for random stream + eventstream += (count - stream_count).times.map do |i| + stream = "stream#{rand(stream_count)}" + lines = [LogStash::Event.new("message" => "hello world #{stream}", "host" => stream, "type" => stream)] + lines += rand(5).times.map do |n| + stream = "stream#{rand(stream_count)}" + LogStash::Event.new("message" => " extra line in #{stream}", "host" => stream, "type" => stream) + end end - sample concurrent_stream do - insist { subject.count } == count + events = eventstream.flatten.map{|event| event.to_hash} + + sample events do + expect(subject).to be_a(Array) + insist { subject.size } == count + subject.each_with_index do |event, i| - #puts "#{i}/#{event["event"]}: #{event.to_json}" - #insist { event.type } == stream - #insist { event.source } == stream + insist { event["type"] == event["host"] } == true + stream = event["type"] insist { event["message"].split("\n").first } =~ /hello world / + insist { event["message"].scan(/stream\d/).all?{|word| word == stream} } == true end end end @@ -101,6 +98,7 @@ add_tag => "dummy" } multiline { + enable_flush => true add_tag => [ "nope" ] remove_tag => "dummy" add_field => [ "dummy2", "value" ] @@ -111,9 +109,14 @@ CONFIG sample [ "120913 12:04:33 first line", "120913 12:04:33 second line" ] do + expect(subject).to be_a(Array) + insist { subject.size } == 2 + subject.each do |s| - insist { s.tags.find_index("nope").nil? && s.tags.find_index("dummy") != nil && !s.fields.has_key?("dummy2") } == true + insist { s["tags"].include?("nope") } == false + insist { s["tags"].include?("dummy") } == true + insist { s.include?("dummy2") } == false end end - end + end end diff --git a/spec/test_utils.rb b/spec/test_utils.rb index 2469feffac4..830038ad044 100644 --- a/spec/test_utils.rb +++ b/spec/test_utils.rb @@ -87,6 +87,7 @@ def sample(sample_event, &block) results = [] count = 0 pipeline.instance_eval { @filters.each(&:register) } + event.each do |e| extra = [] pipeline.filter(e) do |new_event| @@ -96,8 +97,10 @@ def sample(sample_event, &block) results += extra.reject(&:cancelled?) end + pipeline.instance_eval {@filters.each {|f| results += f.flush if f.respond_to?(:flush)}} + # TODO(sissel): pipeline flush needs to be implemented. - #results += pipeline.flush + # results += pipeline.flush next results end