Skip to content

Commit

Permalink
Add NaN support back in to collectd codec
Browse files Browse the repository at this point in the history
* Add spec tests for NaN support
* Catch warning on NaN with nan_handling set to warn
* Make lambdas ivars and initialize so they can reference other ivars
* Fixed reference to point to ivar
* Streamline ivar declarations in one method
* Improve tests to cover more cases
* Make lambdas not be ivars
* Change idiomatic calling of init_lambdas to reflect lack of return value

closes 1363
  • Loading branch information
untergeek committed May 14, 2014
1 parent b710536 commit ed2cdbd
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 93 deletions.
229 changes: 136 additions & 93 deletions lib/logstash/codecs/collectd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
class ProtocolError < LogStash::Error; end
class HeaderError < LogStash::Error; end
class EncryptionError < LogStash::Error; end
class NaNError < LogStash::Error; end

class LogStash::Codecs::Collectd < LogStash::Codecs::Base
config_name "collectd"
Expand Down Expand Up @@ -79,6 +80,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
'@timestamp' => true,
'plugin' => true,
'plugin_instance' => true,
'type_instance' => true,
}

INTERVAL_VALUES_FIELDS = {
Expand Down Expand Up @@ -116,6 +118,20 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
# collectd [Network plugin](https://collectd.org/wiki/index.php/Plugin:Network)
config :security_level, :validate => [SECURITY_NONE, SECURITY_SIGN, SECURITY_ENCR],
:default => "None"

# What to do when a value in the event is NaN (Not a Number)
# - change_value (default): Change the NaN to the value of the nan_value option and add nan_tag as a tag
# - warn: Change the NaN to the value of the nan_value option, print a warning to the log and add nan_tag as a tag
# - drop: Drop the event containing the NaN (this only drops the single event, not the whole packet)
config :nan_handling, :validate => ['change_value','warn','drop'], :default => 'change_value'

# Only relevant when nan_handeling is set to 'change_value'
# Change NaN to this configured value
config :nan_value, :validate => :number, :default => 0

# The tag to add to the event if a NaN value was found
# Set this to an empty string ('') if you don't want to tag
config :nan_tag, :validate => :string, :default => '_collectdNaN'

# Path to the authentication file. This file should have the same format as
# the [AuthFile](http://collectd.org/documentation/manpages/collectd.conf.5.shtml#authfile_filename)
Expand All @@ -126,6 +142,7 @@ class LogStash::Codecs::Collectd < LogStash::Codecs::Base
public
def register
@logger.info("Starting Collectd codec...")
init_lambdas!
if @typesdb.nil?
@typesdb = LogStash::Environment.vendor_path("collectd/types.db")
if !File.exists?(@typesdb)
Expand Down Expand Up @@ -168,103 +185,121 @@ def get_types(paths)
return types
end # def get_types

# Lambdas for hash + closure methodology
# This replaces when statements for fixed values and is much faster
string_decoder = lambda { |body| body.pack("C*")[0..-2] }
numeric_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
counter_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("Q>")[0] }
gauge_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
derive_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("q>")[0] }
# For Low-Resolution time
time_decoder = lambda do |body|
byte1, byte2 = body.pack("C*").unpack("NN")
Time.at(( ((byte1 << 32) + byte2))).utc
end
# Hi-Resolution time
hirestime_decoder = lambda do |body|
byte1, byte2 = body.pack("C*").unpack("NN")
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
end
# Hi resolution intervals
hiresinterval_decoder = lambda do |body|
byte1, byte2 = body.pack("C*").unpack("NN")
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).to_i
end
# Values decoder
values_decoder = lambda do |body|
body.slice!(0..1) # Prune the header
if body.length % 9 == 0 # Should be 9 fields
count = 0
retval = []
# Iterate through and take a slice each time
types = body.slice!(0..((body.length/9)-1))
while body.length > 0
# Use another hash + closure here...
retval << VALUES_DECODER[types[count]].call(body)
count += 1
def init_lambdas!
# Lambdas for hash + closure methodology
# This replaces when statements for fixed values and is much faster
string_decoder = lambda { |body| body.pack("C*")[0..-2] }
numeric_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
counter_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("Q>")[0] }
gauge_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("E")[0] }
derive_decoder = lambda { |body| body.slice!(0..7).pack("C*").unpack("q>")[0] }
# For Low-Resolution time
time_decoder = lambda do |body|
byte1, byte2 = body.pack("C*").unpack("NN")
Time.at(( ((byte1 << 32) + byte2))).utc
end
# Hi-Resolution time
hirestime_decoder = lambda do |body|
byte1, byte2 = body.pack("C*").unpack("NN")
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).utc
end
# Hi resolution intervals
hiresinterval_decoder = lambda do |body|
byte1, byte2 = body.pack("C*").unpack("NN")
Time.at(( ((byte1 << 32) + byte2) * (2**-30) )).to_i
end
# Value type decoder
value_type_decoder = lambda do |body|
body.slice!(0..1) # Prune the header
if body.length % 9 == 0 # Should be 9 fields
count = 0
retval = []
# Iterate through and take a slice each time
types = body.slice!(0..((body.length/9)-1))
while body.length > 0
# Use another hash + closure here...
v = @values_decoder[types[count]].call(body)
if types[count] == 1 && v.nan?
case @nan_handling
when 'drop'; drop = true
else
v = @nan_value
add_nan_tag = true
@nan_handling == 'warn' && @logger.warn("NaN replaced by #{@nan_value}")
end
end
retval << v
count += 1
end
else
@logger.error("Incorrect number of data fields for collectd record", :body => body.to_s)
end
else
@logger.error("Incorrect number of data fields for collectd record", :body => body.to_s)
return retval, drop, add_nan_tag
end
return retval
end
# Signature
signature_decoder = lambda do |body|
if body.length < 32
@logger.warning("SHA256 signature too small (got #{body.length} bytes instead of 32)")
elsif body.length < 33
@logger.warning("Received signature without username")
else
# Signature
signature_decoder = lambda do |body|
if body.length < 32
@logger.warning("SHA256 signature too small (got #{body.length} bytes instead of 32)")
elsif body.length < 33
@logger.warning("Received signature without username")
else
retval = []
# Byte 32 till the end contains the username as chars (=unsigned ints)
retval << body[32..-1].pack('C*')
# Byte 0 till 31 contain the signature
retval << body[0..31].pack('C*')
end
return retval
end
# Encryption
encryption_decoder = lambda do |body|
retval = []
# Byte 32 till the end contains the username as chars (=unsigned ints)
retval << body[32..-1].pack('C*')
# Byte 0 till 31 contain the signature
retval << body[0..31].pack('C*')
user_length = (body.slice!(0) << 8) + body.slice!(0)
retval << body.slice!(0..user_length-1).pack('C*') # Username
retval << body.slice!(0..15).pack('C*') # IV
retval << body.pack('C*')
return retval
end
return retval
end
# Encryption
encryption_decoder = lambda do |body|
retval = []
user_length = (body.slice!(0) << 8) + body.slice!(0)
retval << body.slice!(0..user_length-1).pack('C*') # Username
retval << body.slice!(0..15).pack('C*') # IV
retval << body.pack('C*')
return retval
end
# Lambda Hashes
ID_DECODER = {
0 => string_decoder,
1 => time_decoder,
2 => string_decoder,
3 => string_decoder,
4 => string_decoder,
5 => string_decoder,
6 => values_decoder,
7 => numeric_decoder,
8 => hirestime_decoder,
9 => hiresinterval_decoder,
256 => string_decoder,
257 => numeric_decoder,
512 => signature_decoder,
528 => encryption_decoder
}
# TYPE VALUES:
# 0: COUNTER
# 1: GAUGE
# 2: DERIVE
# 3: ABSOLUTE
VALUES_DECODER = {
0 => counter_decoder,
1 => gauge_decoder,
2 => derive_decoder,
3 => counter_decoder
}
@id_decoder = {
0 => string_decoder,
1 => time_decoder,
2 => string_decoder,
3 => string_decoder,
4 => string_decoder,
5 => string_decoder,
6 => value_type_decoder,
7 => numeric_decoder,
8 => hirestime_decoder,
9 => hiresinterval_decoder,
256 => string_decoder,
257 => numeric_decoder,
512 => signature_decoder,
528 => encryption_decoder
}
# TYPE VALUES:
# 0: COUNTER
# 1: GAUGE
# 2: DERIVE
# 3: ABSOLUTE
@values_decoder = {
0 => counter_decoder,
1 => gauge_decoder,
2 => derive_decoder,
3 => counter_decoder
}
end # def init_lambdas!

public
def get_values(id, body)
drop = false
add_tag = false
if id == 6
retval, drop, add_nan_tag = @id_decoder[id].call(body)
# Use hash + closure/lambda to speed operations
ID_DECODER[id].call(body)
else
retval = @id_decoder[id].call(body)
end
return retval, drop, add_nan_tag
end

private
Expand Down Expand Up @@ -361,7 +396,7 @@ def decode(payload)
length = ((payload.slice!(0) << 8) + payload.slice!(0)) - 4
# Validate that the part length is correct
raise(HeaderError) if length > payload.length

body = payload.slice!(0..length-1)

field = TYPEMAP[typenum]
Expand All @@ -370,7 +405,7 @@ def decode(payload)
next
end

values = get_values(typenum, body)
values, drop, add_nan_tag = get_values(typenum, body)

case typenum
when SIGNATURE_TYPE
Expand Down Expand Up @@ -425,17 +460,25 @@ def decode(payload)
# This is better than looping over all keys every time.
collectd.delete('type_instance') if collectd['type_instance'] == ""
collectd.delete('plugin_instance') if collectd['plugin_instance'] == ""
if add_nan_tag
collectd['tags'] ||= []
collectd['tags'] << @nan_tag
end
# This ugly little shallow-copy hack keeps the new event from getting munged by the cleanup
# With pass-by-reference we get hosed (if we pass collectd, then clean it up rapidly, values can disappear)
yield LogStash::Event.new(collectd.dup)
if !drop # Drop the event if it's flagged true
yield LogStash::Event.new(collectd.dup)
else
raise(NaNError)
end
end
# Clean up the event
collectd.each_key do |k|
collectd.delete(k) if !INTERVAL_BASE_FIELDS.has_key?(k)
end
end
end # while payload.length > 0 do
rescue EncryptionError, ProtocolError, HeaderError
rescue EncryptionError, ProtocolError, HeaderError, NaNError
# basically do nothing, we just want out
end # def decode

Expand Down
Loading

0 comments on commit ed2cdbd

Please sign in to comment.