Skip to content

Commit

Permalink
Merge branch 'codec-hacking'
Browse files Browse the repository at this point in the history
  • Loading branch information
nickethier committed Jun 4, 2013
2 parents cdcbbe7 + 047aab5 commit 42ed117
Show file tree
Hide file tree
Showing 20 changed files with 374 additions and 80 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.*.swp
*.gem
*.class
.rbx
Gemfile.lock
.rbx
*.tar.gz
Expand Down
47 changes: 47 additions & 0 deletions lib/logstash/codecs/base.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
require "logstash/namespace"
require "logstash/event"
require "logstash/plugin"
require "logstash/logging"
require "extlib"

# This is the base class for logstash codecs.
module LogStash::Codecs
public
def self.for(codec)
return codec unless codec.is_a? String

#TODO: codec paths or just use plugin paths
plugin = File.join('logstash', 'codecs', codec) + ".rb"
#@logger.info "Loading codec", :codec => plugin
require plugin
klass_name = codec.camel_case
if LogStash::Codecs.const_defined?(klass_name)
return LogStash::Codecs.const_get(klass_name)
end
nil
end

class Base < LogStash::Plugin

attr_reader :on_event
attr_accessor :charset

public
def decode(data)
raise "#{self.class}#decode must be overidden"
end # def decode

alias_method :<<, :decode

public
def encode(data)
raise "#{self.class}#encode must be overidden"
end # def encode

public
def on_event(&block)
@on_event = block
end

end # class LogStash::Codecs::Base
end
16 changes: 16 additions & 0 deletions lib/logstash/codecs/json.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
require "logstash/codecs/base"
require "json"

# This is the base class for logstash codecs.
class LogStash::Codecs::Json < LogStash::Codecs::Base
public
def decode(data)
yield LogStash::Event.new(JSON.parse(data.force_encoding("UTF-8")))
end # def decode

public
def encode(data)
@on_event.call data.to_json
end # def encode

end # class LogStash::Codecs::Json
25 changes: 25 additions & 0 deletions lib/logstash/codecs/json_spooler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
require "logstash/codecs/base"
require "logstash/codecs/spool"

# This is the base class for logstash codecs.
class LogStash::Codecs::JsonSpooler < LogStash::Codecs::Base
public
def initialize
@spooler = LogStash::Codecs::Spool.new
@spooler.on_event do |data|
@on_event.call data.to_json
end
end
public
def decode(data)
@spooler.decode(JSON.parse(data.force_encoding("UTF-8"))) do |event|
yield event
end
end # def decode

public
def encode(data)
@spooler.encode(data)
end # def encode

end # class LogStash::Codecs::Json
14 changes: 14 additions & 0 deletions lib/logstash/codecs/noop.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
require "logstash/codecs/base"

class LogStash::Codecs::Noop < LogStash::Codecs::Base
public
def decode(data)
yield data
end # def decode

public
def encode(data)
@on_event.call data
end # def encode

end # class LogStash::Codecs::Noop
26 changes: 26 additions & 0 deletions lib/logstash/codecs/plain.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
require "logstash/codecs/base"

# This is the base class for logstash codecs.
class LogStash::Codecs::Plain < LogStash::Codecs::Base
attr_accessor :format

public
def decode(data)
data.force_encoding(@charset)
if @charset != "UTF-8"
# Convert to UTF-8 if not in that character set.
data = data.encode("UTF-8", :invalid => :replace, :undef => :replace)
end
yield LogStash::Event.new({"message" => data})
end # def decode

public
def encode(data)
if data.is_a? LogStash::Event and !@format.nil?
@on_event.call data.sprintf(@format)
else
@on_event.call data.to_s
end
end # def encode

end # class LogStash::Codecs::Plain
27 changes: 27 additions & 0 deletions lib/logstash/codecs/spool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
require "logstash/codecs/base"

class LogStash::Codecs::Spool < LogStash::Codecs::Base

attr_reader :buffer

public
def decode(data)
data.each do |event|
yield event
end
end # def decode

public
def encode(data)
@buffer = [] if @buffer.nil?
#buffer size is hard coded for now until a
#better way to pass args into codecs is implemented
if @buffer.length >= 50
@on_event.call @buffer
@buffer = []
else
@buffer << data
end
end # def encode

end # class LogStash::Codecs::Spool
12 changes: 11 additions & 1 deletion lib/logstash/inputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "logstash/plugin"
require "logstash/logging"
require "logstash/config/mixin"
require "logstash/codecs/base"

# This is the base class for logstash inputs.
class LogStash::Inputs::Base < LogStash::Plugin
Expand All @@ -29,7 +30,10 @@ class LogStash::Inputs::Base < LogStash::Plugin
config :debug, :validate => :boolean, :default => false

# The format of input data (plain, json, json_event)
config :format, :validate => ["plain", "json", "json_event", "msgpack_event"]
config :format, :validate => ["plain", "json", "json_event", "msgpack_event"], :deprecated => true

# The codec used for input data
config :codec, :validate => :string, :default => 'plain'

# The character encoding used in this input. Examples include "UTF-8"
# and "cp1252"
Expand Down Expand Up @@ -78,6 +82,12 @@ def tag(newtag)
@tags << newtag
end # def tag

protected
def enable_codecs
@codec = LogStash::Codecs.for(@codec).new
@codec.charset = @charset
end

protected
def to_event(raw, source)
@format ||= "plain"
Expand Down
11 changes: 7 additions & 4 deletions lib/logstash/inputs/exec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class LogStash::Inputs::Exec < LogStash::Inputs::Base

public
def register
enable_codecs
@logger.info("Registering Exec Input", :type => @type,
:command => @command, :interval => @interval)
end # def register
Expand All @@ -37,10 +38,12 @@ def run(queue)
@logger.info("Running exec", :command => @command) if @debug
out = IO.popen(@command)
# out.read will block until the process finishes.
e = to_event(out.read, "exec://#{Socket.gethostname}/")
e["command"] = @command
queue << e

@codec.decode(out.read) do |event|
event["source"] => "exec://#{Socket.gethostname}"
event["command"] => @command
queue << event
end

duration = Time.now - start
if @debug
@logger.info("Command completed", :command => @command,
Expand Down
6 changes: 3 additions & 3 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def register
require "addressable/uri"
require "filewatch/tail"
require "digest/md5"
enable_codecs
LogStash::Util::set_thread_name("input|file|#{path.join(":")}")
@logger.info("Registering file input", :path => @path)

Expand Down Expand Up @@ -123,9 +124,8 @@ def run(queue)
#source = Addressable::URI.new(:scheme => "file", :host => hostname, :path => path).to_s
source = "file://#{hostname}/#{path.gsub("\\","/")}"
@logger.debug? && @logger.debug("Received line", :path => path, :line => line)
e = to_event(line, source)
if e
queue << e
@codec.decode(line) do |event|
event["source"] = source
end
end
finished
Expand Down
16 changes: 11 additions & 5 deletions lib/logstash/inputs/generator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class LogStash::Inputs::Generator < LogStash::Inputs::Threadable

public
def register
enable_codecs
@host = Socket.gethostname

if @count.is_a?(Array)
Expand All @@ -55,6 +56,7 @@ def register
end # def register

def run(queue)

number = 0
source = "generator://#{@host}/"

Expand All @@ -67,14 +69,18 @@ def run(queue)
while !finished? && (@count <= 0 || number < @count)
if @lines
@lines.each do |line|
event = to_event(line, source)
@codec.decode(line) do |event|
event["source"] = source
event["sequence"] = number
queue << event
end
end
else
@codec.decode(@message.clone) do |event|
event["source"] = source
event["sequence"] = number
queue << event
end
else
event = to_event(@message.clone, source)
event["sequence"] = number
queue << event
end
number += 1
end # loop
Expand Down
10 changes: 5 additions & 5 deletions lib/logstash/inputs/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ def initialize(params)
super

@format ||= "json_event"

@codec = "json"
end # def initialize

public
def register

enable_codecs
@logger.info("Registering input #{@url}")
require "bunny" # rubygem 'bunny'
@vhost ||= "/"
Expand Down Expand Up @@ -127,9 +127,9 @@ def run(queue)
@bunnyqueue.bind(@exchange, :key => @key)

@bunnyqueue.subscribe({:ack => @ack}) do |data|
e = to_event(data[:payload], @amqpurl)
if e
queue << e
@codec.decode(data[:payload]) do |event|
event["source"] = @amqpurl
queue << event
end
end # @bunnyqueue.subscribe

Expand Down
9 changes: 6 additions & 3 deletions lib/logstash/inputs/stdin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@ class LogStash::Inputs::Stdin < LogStash::Inputs::Base

public
def register
enable_codecs
@host = Socket.gethostname
end # def register

def run(queue)
def run(queue)
while true
begin
e = to_event($stdin.readline.chomp, "stdin://#{@host}/")
@codec.decode($stdin.readline.chomp) do |event|
event["source"] = "stdin://#{@host}/"
queue << event
end
rescue EOFError => ex
# stdin closed, finish
break
end
queue << e if e
end # while true
finished
end # def run
Expand Down
17 changes: 8 additions & 9 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def initialize(*args)

public
def register
enable_codecs
require "socket"
require "timeout"
if @ssl_enable
Expand Down Expand Up @@ -94,7 +95,7 @@ def register
end # def register

private
def handle_socket(socket, output_queue, event_source)
def handle_socket(socket, event_source, output_queue)
begin
loop do
buf = nil
Expand All @@ -108,12 +109,10 @@ def handle_socket(socket, output_queue, event_source)
buf = readline(socket)
end
end
e = self.to_event(buf, event_source)
if e
if @ssl_enable && @ssl_verify
e.fields["sslsubject"] = socket.peer_cert.subject
end
output_queue << e
@codec.decode(buf) do |event|
event["source"] = event_source
event["sslsubject"] = socket.peer_cert.subject if @ssl_enable && @ssl_verify
output_queue << event
end
end # loop do
rescue => e
Expand Down Expand Up @@ -158,7 +157,7 @@ def run(output_queue)
@logger.debug("Accepted connection", :client => s.peer,
:server => "#{@host}:#{@port}")
begin
handle_socket(s, output_queue, "tcp://#{s.peer}/")
handle_socket(s, "tcp://#{s.peer}/", output_queue)
rescue Interrupted
s.close rescue nil
end
Expand Down Expand Up @@ -198,7 +197,7 @@ def run(output_queue)
end
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Opened connection", :client => "#{client_socket.peer}")
handle_socket(client_socket, output_queue, "tcp://#{client_socket.peer}/server")
handle_socket(client_socket, "tcp://#{client_socket.peer}/server", output_queue)
end # loop
end
end # def run
Expand Down
Loading

0 comments on commit 42ed117

Please sign in to comment.