Skip to content

Commit

Permalink
Merge pull request elastic#731 from logstash/json-streaming
Browse files Browse the repository at this point in the history
Break out json/json_lines codec.
  • Loading branch information
nickethier committed Oct 20, 2013
2 parents fe75a5d + 3628e24 commit efe138b
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 32 deletions.
29 changes: 10 additions & 19 deletions lib/logstash/codecs/json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
require "logstash/codecs/line"
require "json"

# This codec will encode and decode JSON.
# The codec should be used to decode full json messages.
# If you are streaming JSON messages delimited by '\n' then
# see the json_lines codec.
# Encoding will result in a single json string.
class LogStash::Codecs::JSON < LogStash::Codecs::Base
config_name "json"

Expand All @@ -18,32 +21,20 @@ class LogStash::Codecs::JSON < LogStash::Codecs::Base
#
# For nxlog users, you'll want to set this to "CP1252"
config :charset, :validate => ::Encoding.name_list, :default => "UTF-8"

public
def initialize(params={})
super(params)
@lines = LogStash::Codecs::Line.new
@lines.charset = @charset
end

public
def decode(data)

@lines.decode(data) do |event|
begin
yield LogStash::Event.new(JSON.parse(event["message"]))
rescue JSON::ParserError => e
@logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => data)
yield LogStash::Event.new("message" => data)
end
begin
yield LogStash::Event.new(JSON.parse(data))
rescue JSON::ParserError => e
@logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => data)
yield LogStash::Event.new("message" => data)
end
end # def decode

public
def encode(data)
# Tack on a \n for now because previously most of logstash's JSON
# outputs emitted one per line, and whitespace is OK in json.
@on_event.call(data.to_json + "\n")
@on_event.call(data.to_json)
end # def encode

end # class LogStash::Codecs::JSON
51 changes: 51 additions & 0 deletions lib/logstash/codecs/json_lines.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require "logstash/codecs/base"
require "logstash/codecs/line"
require "json"

# This codec will decode streamed JSON that is newline delimited.
# For decoding JSON payload in the redis input for example, use the json codec instead.
# Encoding will emit a single JSON string ending in a '\n'
class LogStash::Codecs::JSONLines < LogStash::Codecs::Base
config_name "json_lines"

milestone 1

# The character encoding used in this codec. Examples include "UTF-8" and
# "CP1252"
#
# JSON requires valid UTF-8 strings, but in some cases, software that
# emits JSON does so in another encoding (nxlog, for example). In
# weird cases like this, you can set the charset setting to the
# actual encoding of the text and logstash will convert it for you.
#
# For nxlog users, you'll want to set this to "CP1252"
config :charset, :validate => ::Encoding.name_list, :default => "UTF-8"

public
def initialize(params={})
super(params)
@lines = LogStash::Codecs::Line.new
@lines.charset = @charset
end

public
def decode(data)

@lines.decode(data) do |event|
begin
yield LogStash::Event.new(JSON.parse(event["message"]))
rescue JSON::ParserError => e
@logger.info("JSON parse failure. Falling back to plain-text", :error => e, :data => data)
yield LogStash::Event.new("message" => data)
end
end
end # def decode

public
def encode(data)
# Tack on a \n for now because previously most of logstash's JSON
# outputs emitted one per line, and whitespace is OK in json.
@on_event.call(data.to_json + "\n")
end # def encode

end # class LogStash::Codecs::JSON
13 changes: 0 additions & 13 deletions spec/codecs/json.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,8 @@

context "#decode" do
it "should return an event from json data" do
data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}}
subject.decode(data.to_json+"\n") do |event|
insist { event.is_a? LogStash::Event }
insist { event["foo"] } == data["foo"]
insist { event["baz"] } == data["baz"]
insist { event["bah"] } == data["bah"]
end
end

it "should return an event from json data when a newline is recieved" do
data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}}
subject.decode(data.to_json) do |event|
insist {false}
end
subject.decode("\n") do |event|
insist { event.is_a? LogStash::Event }
insist { event["foo"] } == data["foo"]
insist { event["baz"] } == data["baz"]
Expand Down
51 changes: 51 additions & 0 deletions spec/codecs/json_lines.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
require "logstash/codecs/json_lines"
require "logstash/event"
require "insist"

describe LogStash::Codecs::JSONLines do
subject do
next LogStash::Codecs::JSONLines.new
end

context "#decode" do
it "should return an event from json data" do
data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}}
subject.decode(data.to_json+"\n") do |event|
insist { event.is_a? LogStash::Event }
insist { event["foo"] } == data["foo"]
insist { event["baz"] } == data["baz"]
insist { event["bah"] } == data["bah"]
end
end

it "should return an event from json data when a newline is recieved" do
data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}}
subject.decode(data.to_json) do |event|
insist {false}
end
subject.decode("\n") do |event|
insist { event.is_a? LogStash::Event }
insist { event["foo"] } == data["foo"]
insist { event["baz"] } == data["baz"]
insist { event["bah"] } == data["bah"]
end
end
end

context "#encode" do
it "should return json data" do
data = {"foo" => "bar", "baz" => {"bah" => ["a","b","c"]}}
event = LogStash::Event.new(data)
got_event = false
subject.on_event do |d|
insist { d.chomp } == LogStash::Event.new(data).to_json
insist { JSON.parse(d)["foo"] } == data["foo"]
insist { JSON.parse(d)["baz"] } == data["baz"]
insist { JSON.parse(d)["bah"] } == data["bah"]
got_event = true
end
subject.encode(event)
insist { got_event }
end
end
end

0 comments on commit efe138b

Please sign in to comment.