Skip to content

Commit

Permalink
Merge branch 'master' of github.com:logstash/logstash
Browse files Browse the repository at this point in the history
  • Loading branch information
jordansissel committed Jan 30, 2013
2 parents bf440f7 + f61611f commit b3bd90a
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 4 deletions.
66 changes: 63 additions & 3 deletions lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "logstash/util/socket_peer"
require "socket"
require "timeout"
require "openssl"

# Read events over a TCP socket.
#
Expand All @@ -12,7 +13,6 @@
# depending on `mode`.
class LogStash::Inputs::Tcp < LogStash::Inputs::Base
class Interrupted < StandardError; end

config_name "tcp"
plugin_status "beta"

Expand All @@ -34,12 +34,50 @@ class Interrupted < StandardError; end
# `client` connects to a server.
config :mode, :validate => ["server", "client"], :default => "server"

# Enable ssl (must be set for other `ssl_` options to take effect_
config :ssl_enable, :validate => :boolean, :default => false

# Verify the identity of the other end of the ssl connection against the CA
# For input, sets the `@field.sslsubject` to that of the client certificate
config :ssl_verify, :validate => :boolean, :default => false

# ssl CA certificate, chainfile or CA path
# The system CA path is automatically included
config :ssl_cacert, :validate => :string

# ssl certificate
config :ssl_cert, :validate => :string

# ssl key
config :ssl_key, :validate => :string

# ssl key passphrase
config :ssl_key_passphrase, :validate => :password, :default => nil

def initialize(*args)
super(*args)
end # def initialize

public
def register
if @ssl_enable
@ssl_context = OpenSSL::SSL::SSLContext.new
@ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
@ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase)
if @ssl_verify
@cert_store = OpenSSL::X509::Store.new
# Load the system default certificate path to the store
@cert_store.set_default_paths
if File.directory?(@ssl_cacert)
@cert_store.add_path(@ssl_cacert)
else
@cert_store.add_file(@ssl_cacert)
end
@ssl_context.cert_store = @cert_store
@ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
end
end # @ssl_enable

if server?
@logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
begin
Expand All @@ -49,6 +87,9 @@ def register
:host => @host, :port => @port)
raise
end
if @ssl_enable
@server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context)
end # @ssl_enable
end
end # def register

Expand All @@ -69,6 +110,9 @@ def handle_socket(socket, output_queue, event_source)
end
e = self.to_event(buf, event_source)
if e
if @ssl_enable && @ssl_verify
e.fields["sslsubject"] = socket.peer_cert.subject
end
output_queue << e
end
end # loop do
Expand All @@ -84,7 +128,7 @@ def handle_socket(socket, output_queue, event_source)
begin
socket.close
rescue IOError
pass
#pass
end # begin
end

Expand Down Expand Up @@ -113,12 +157,16 @@ def run(output_queue)
s.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Accepted connection", :client => s.peer,
:server => "#{@host}:#{@port}")
begin
begin
handle_socket(s, output_queue, "tcp://#{s.peer}/")
rescue Interrupted
s.close rescue nil
end
end # Thread.start
rescue OpenSSL::SSL::SSLError => ssle
# NOTE(mrichar1): This doesn't return a useful error message for some reason
@logger.error("SSL Error", :exception => ssle,
:backtrace => ssle.backtrace)
rescue IOError, Interrupted
if @interrupted
# Intended shutdown, get out of the loop
Expand All @@ -136,6 +184,18 @@ def run(output_queue)
else
loop do
client_socket = TCPSocket.new(@host, @port)
if @ssl_enable
client_socket = OpenSSL::SSL::SSLSocket.new(client_socket, @ssl_context)
begin
client_socket.connect
rescue OpenSSL::SSL::SSLError => ssle
@logger.error("SSL Error", :exception => ssle,
:backtrace => ssle.backtrace)
# NOTE(mrichar1): Hack to prevent hammering peer
sleep(5)
next
end
end
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
@logger.debug("Opened connection", :client => "#{client_socket.peer}")
handle_socket(client_socket, output_queue, "tcp://#{client_socket.peer}/server")
Expand Down
19 changes: 18 additions & 1 deletion lib/logstash/outputs/http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base
# If form, then the body will be the mapping (or whole event) converted
# into a query parameter string (foo=bar&baz=fizz...)
#
# If message, then the body will be the result of formatting the event according to message
#
# Otherwise, the event is sent as json.
config :format, :validate => ["json", "form"], :default => "json"
config :format, :validate => ["json", "form", "message"], :default => "json"

config :message, :validate => :string

public
def register
Expand All @@ -63,6 +67,17 @@ def register
when "json" ; @content_type = "application/json"
end
end
if @format == "message"
if @message.nil?
raise "message must be set if message format is used"
end
if @content_type.nil?
raise "content_type must be set if message format is used"
end
unless @mapping.nil?
@logger.warn "mapping is not supported and will be ignored if message format is used"
end
end
end # def register

public
Expand Down Expand Up @@ -98,6 +113,8 @@ def receive(event)
begin
if @format == "json"
request.body = evt.to_json
elsif @format == "message"
request.body = event.sprintf(@message)
else
request.body = encode(evt)
end
Expand Down

0 comments on commit b3bd90a

Please sign in to comment.