diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index 851e74793ea..fd4a8c7bf1a 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -89,7 +89,16 @@ def register ) end + if @batch + @flush_thread = Thread.new do + while sleep(@batch_timeout) do + process_pending(true) + end + end + end + @redis = nil + @pending_mutex = Mutex.new end # def register private @@ -137,14 +146,18 @@ def receive(event) private def process_pending(force=false) + if !@pending_mutex.try_lock # failed to get lock + return + end + pending_count = 0 @pending.each { |k, v| pending_count += v.length } time_since_last_flush = Time.now.to_f - @last_pending_flush - if force || + if (force && pending_count > 0) || (pending_count >= @batch_events) || - (time_since_last_flush >= @batch_timeout) - @logger.debug("Flushing redis output", + (time_since_last_flush >= @batch_timeout && pending_count > 0) + @logger.warn("Flushing redis output", :pending_count => pending_count, :time_since_last_flush => time_since_last_flush, :batch_events => @batch_events, @@ -158,6 +171,7 @@ def process_pending(force=false) end @last_pending_flush = Time.now.to_f rescue => e + @pending_mutex.unlock @logger.warn("Failed to send backlog of events to redis", :pending => pending, :identity => identity, :exception => e, @@ -165,6 +179,8 @@ def process_pending(force=false) raise e end end + + @pending_mutex.unlock end public