Skip to content

Commit

Permalink
Added some more tests and tweaked things so that they pass
Browse files Browse the repository at this point in the history
I've added some tests that ensure the Relp class handles some relp
errors properly; it checks that it raises the correct warnings but
allows continued execution of the server. There are still a few
things that need cleaning up, but on the whole it seems to be
working.
  • Loading branch information
MikeWorth committed Aug 3, 2012
1 parent 69930ea commit 58ab7ea
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 17 deletions.
4 changes: 3 additions & 1 deletion lib/logstash/inputs/relp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def run(output_queue)
#TODO: Still not happy with this, are they all warn level?
#Will this catch everything I want it to?
#Relp spec says to close connection on error, ensure this is the case
rs.serverclose
#rs.serverclose
end
end # Thread.start
rescue Relp::InvalidCommand,Relp::InappropriateCommand => e
Expand All @@ -75,3 +75,5 @@ def teardown
@relp_server.shutdown
end
end # class LogStash::Inputs::Relp

#TODO: structured error logging
23 changes: 11 additions & 12 deletions lib/logstash/util/relp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ def frame_write(frame)
frame['txnr']=frame['txnr'].to_s
frame['message']='' if frame['message'].nil?
frame['datalen']=frame['message'].length.to_s
#Ending each frame with a newline is required in the specifications
wiredata=[frame['txnr'],frame['command'],frame['datalen'],frame['message']].join(' ').strip+"\n"
wiredata=[frame['txnr'],frame['command'],frame['datalen'],frame['message']].join(' ').strip
begin
@socket.write(wiredata)
rescue Errno::EPIPE,IOError#TODO: is this sufficient to catch all broken connections?
#Ending each frame with a newline is required in the specifications, doing it a separately is useful (but a bit of a bodge) because for some reason it seems to take 2 writes after the server closes the connection before we get an exception
@socket.write("\n")
rescue Errno::EPIPE,IOError,Errno::ECONNRESET#TODO: is this sufficient to catch all broken connections?
raise ConnectionClosed
end
frame['txnr'].to_i
Expand All @@ -66,7 +67,7 @@ def frame_read
frame['datalen']=(leading_digit + @socket.readline(' ')).strip.to_i
frame['message']=@socket.read(frame['datalen'])
end
rescue EOFError,Errno::ECONNRESET
rescue EOFError,Errno::ECONNRESET,IOError
raise ConnectionClosed
end
if ! self.valid_command?(frame['command'])#TODO: is this enough to catch framing errors?
Expand Down Expand Up @@ -126,7 +127,7 @@ def accept
response_frame=Hash.new
response_frame['txnr']=frame['txnr']
response_frame['command']='rsp'
response_frame['message']='500 Required commands '+(@required_relp_commands - offer['commands'].split(',')).join(',')+' not offered'
response_frame['message']='500 Required command(s) '+(@required_relp_commands - offer['commands'].split(',')).join(',')+' not offered'
self.frame_write(response_frame)

self.serverclose
Expand Down Expand Up @@ -185,9 +186,7 @@ def serverclose
def shutdown
begin
@server.shutdown
#@logger.debug('pre')
@server.close
#@logger.debug('post')
rescue Exception#@server might already be down
end
end
Expand Down Expand Up @@ -246,7 +245,7 @@ def initialize(host,port,required_commands=[],buffer_size=128,retransmission_tim


#This thread deals with responses that come back
Thread.start do
reader=Thread.start do |parent|
loop do
f=self.frame_read
if f['command']=='rsp' && f['message']=='200 OK'
Expand All @@ -256,11 +255,11 @@ def initialize(host,port,required_commands=[],buffer_size=128,retransmission_tim
new_txnr=self.frame_write(@buffer[f['txnr']])
@buffer[new_txnr]=@buffer[f['txnr']]
@buffer.delete(f['txnr'])
elsif f['command']=='serverclose'
raise ConnectionClosed#TODO: this doesn't raise the exception anywhere sensible
elsif f['command']=='serverclose' || f['txnr']==@close_txnr
parent.raise ConnectionClosed#TODO: raising errors like this makes no sense
else
#Don't know what's going on if we get here, but it can't be good
raise RelpError#TODO: this exception will disappear as well
parent.raise RelpError#TODO: raising errors like this makes no sense
end
end
end
Expand All @@ -285,7 +284,7 @@ def initialize(host,port,required_commands=[],buffer_size=128,retransmission_tim
def close
frame=Hash.new
frame['command']='close'
txnr=self.frame_write(frame)
@close_txnr=self.frame_write(frame)
#TODO: ought to properly wait for a reply etc. The serverclose will make it work though
sleep @retransmission_timeout
@socket.close#TODO: shutdown?
Expand Down
59 changes: 55 additions & 4 deletions test/logstash/inputs/test_relp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

after do
@input.teardown
# This plugin has no proper teardown yet.
end # after

test "Basic handshaking/message transmission" do
Expand Down Expand Up @@ -72,12 +71,64 @@
end
end

test "RelpServer rejects invalid/innapropriate commands" do
#TODO:
test "RelpServer rejects invalid commands" do
#Need it to close the connection, but not bring down the whole server
queue = Queue.new
thread = Thread.new { @input.run(queue) }

logger=Queue.new
(@input.instance_eval { @logger }).subscribe(logger)

assert_raises(Relp::ConnectionClosed) do#TODO: I think these exceptions are being raised wrong somehow
rc=RelpClient.new('127.0.0.1',15515,['syslog'])
badframe=Hash.new
badframe['command']='badcommand'
rc.frame_write(badframe)
#We can't detect that it's closed until we try to write to it again (delay is required for connection to be closed)
sleep 1
rc.frame_write(badframe)
end
assert_equal("Relp error: Relp::InvalidCommand badcommand",logger.pop[:message])
end

test "RelpServer rejects inappropriate commands" do
#Need it to close the connection, but not bring down the whole server
queue = Queue.new
thread = Thread.new { @input.run(queue) }

logger=Queue.new
(@input.instance_eval { @logger }).subscribe(logger)

assert_raises(Relp::ConnectionClosed) do #TODO: I think these exceptions are being raised wrong somehow
rc=RelpClient.new('127.0.0.1',15515,['syslog'])
badframe=Hash.new
badframe['command']='open'#it's not expecting open again
rc.frame_write(badframe)
#We can't detect that it's closed until we try to write to it again(but with something other than an open)
sleep 1
badframe['command']='syslog'
rc.frame_write(badframe)
end
assert_equal("Relp error: Relp::InappropriateCommand open expecting syslog",logger.pop[:message])

end

test "RelpServer refuses to connect if no syslog command available" do
#TODO:

logger=Queue.new
(@input.instance_eval { @logger }).subscribe(logger)

assert_raises(Relp::RelpError) do
queue = Queue.new
thread = Thread.new { @input.run(queue) }
rc=RelpClient.new('127.0.0.1',15515)
end

assert_equal("Relp client incapable of syslog",logger.pop[:message])


end

end # testing for LogStash::Inputs::File

#TODO: structured error logging

0 comments on commit 58ab7ea

Please sign in to comment.