Skip to content

Commit

Permalink
- Allow setting your own custom 'stream identity' to help the multiline
Browse files Browse the repository at this point in the history
  filter decide which events are parts of the same event stream.
  LOGSTASH-159
  • Loading branch information
jordansissel committed Oct 18, 2011
1 parent a57b902 commit cc974ae
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 7 deletions.
22 changes: 18 additions & 4 deletions lib/logstash/filters/multiline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ class LogStash::Filters::Multiline < LogStash::Filters::Base

# Negate the regexp pattern ('if not matched')
config :negate, :validate => :boolean, :default => false

# The stream identity is how the multiline filter determines which stream an
# event belongs. This is generally used for differentiating, say, events
# coming from multiple files in the same file input, or multiple connections
# coming from a tcp input.
#
# The default value here is usually what you want, but there are some cases
# where you want to change it. One such example is if you are using a tcp
# input with only one client connecting at any time. If that client
# reconnects (due to error or client restart), then logstash will identify
# the new connection as a new stream and break any multiline goodness that
# may have occurred between the old and new connection. To solve this use
# case, you can use "%{@source_host}.%{@type}" instead.
config :stream_identity , :validate => :string, :default => "%{@source}.%{@type}"

public
def initialize(config = {})
Expand All @@ -96,7 +110,7 @@ def filter(event)
return unless event.type == @type

match = @pattern.match(event.message)
key = [event.source, event.type]
key = event.sprintf(@stream_identity)
pending = @pending[key]

@logger.debug("Multiline", :pattern => @pattern, :message => event.message,
Expand Down Expand Up @@ -162,14 +176,14 @@ def filter(event)
filter_matched(event) if !event.cancelled?
end # def filter

# flush any pending messages
# Flush any pending messages. This is generally used for unit testing only.
public
def flush(source, type)
key = [source, type]
def flush(key)
if @pending[key]
event = @pending[key]
@pending.delete(key)
end
return event
end # def flush

end # class LogStash::Filters::Date
45 changes: 42 additions & 3 deletions test/logstash/filters/test_multiline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def config(cfg)
outputs << event.message
end
end
last = @filter.flush("unknown", @typename)
last = @filter.flush("unknown.#{@typename}")
if last
outputs << last.message
end
Expand Down Expand Up @@ -137,7 +137,7 @@ def config(cfg)
outputs << event.message
end
end
last = @filter.flush("unknown", @typename)
last = @filter.flush("unknown.#{@typename}")
if last
outputs << last.message
end
Expand Down Expand Up @@ -181,7 +181,7 @@ def config(cfg)
outputs << event.message
end
end
last = @filter.flush("unknown", @typename)
last = @filter.flush("unknown.#{@typename}")
if last
outputs << last.message
end
Expand All @@ -191,4 +191,43 @@ def config(cfg)
assert_equal(expected, actual)
end
end # negate false

test "with custom stream identity" do
config "pattern" => ".*", "what" => "next",
"stream_identity" => "%{key}"

inputs = [
"one",
"two",
"one",
"two",
"one",
]

expected_outputs = [
"one\none\none",
"two\ntwo",
]

outputs = []

inputs.each_with_index do |input, i|
event = LogStash::Event.new
event.type = @typename
# even/odd keying to fake multiple streams.
event["key"] = ["odd", "even"][i % 2]
event.message = input
@filter.filter(event)
if !event.cancelled?
outputs << event.message
end
end
outputs << @filter.flush("odd").message
outputs << @filter.flush("even").message
assert_equal(expected_outputs.length, outputs.length,
"Incorrect number of output events")
expected_outputs.zip(outputs).each do |expected, actual|
assert_equal(expected, actual)
end
end
end # tests for LogStash::Filters::Multiline

0 comments on commit cc974ae

Please sign in to comment.