Skip to content

Commit

Permalink
Improve OutputDelegator implementation and specs.
Browse files Browse the repository at this point in the history
Backwards compatibility is now implemented for existing workers_not_supported uses.

This clears up a few bugs in the initial pass as well.

Fixes elastic#4391
  • Loading branch information
andrewvc committed Jan 7, 2016
1 parent 57f8b65 commit f6ff16e
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 48 deletions.
60 changes: 45 additions & 15 deletions logstash-core/lib/logstash/output_delegator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,62 @@ def initialize(logger, klass, default_worker_count, *args)
@threadsafe = klass.threadsafe?
@config = args.reduce({}, :merge)
@klass = klass
@worker_count = calculate_worker_count(default_worker_count)

# We define this as an array regardless of threadsafety
# to make reporting simpler, even though a threadsafe plugin will just have
# a single instance
#
# Older plugins invoke the instance method Outputs::Base#workers_not_supported
# To detect these we need an instance to be created first :()
# TODO: In the next major version after 2.x remove support for this
@workers = [@klass.new(*args)]
@workers.first.register # Needed in case register calls `workers_not_supported`

# DO NOT move this statement before the instantiation of the first single instance
# Read the note above to understand why
@worker_count = calculate_worker_count(default_worker_count)
warn_on_worker_override!

# This queue is used to manage sharing across threads
@worker_queue = SizedQueue.new(@worker_count)

# We define this as an array regardless of threadsafety
# to make reporting simpler
@workers = @worker_count.times.map do
w = @klass.new(*args)
w.register
@worker_queue << w
w
@workers += (@worker_count - 1).times.map do
inst = @klass.new(*args)
inst.register
inst
end

@workers.each { |w| @worker_queue << w }

@events_received = Concurrent::AtomicFixnum.new(0)

if threadsafe
if threadsafe?
@threadsafe_worker = @workers.first
self.define_singleton_method(:multi_receive, method(:threadsafe_multi_receive))
else
self.define_singleton_method(:multi_receive, method(:worker_multi_receive))
end
end

def threadsafe?
!!@threadsafe
end

def warn_on_worker_override!
# The user has configured extra workers, but this plugin doesn't support it :(
if @config["workers"] && @config["workers"] > 1 && @klass.workers_not_supported?
message = @workers_not_supported_message
if worker_limits_overriden?
message = @klass.workers_not_supported_message
if message
@logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => self.class.config_name, :worker_count => @workers, :message => message))
@logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported-with-message", :plugin => @klass.config_name, :worker_count => @config["workers"], :message => message))
else
@logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => self.class.config_name, :worker_count => @workers))
@logger.warn(I18n.t("logstash.pipeline.output-worker-unsupported", :plugin => @klass.config_name, :worker_count => @config["workers"], :message => message))
end
end
end

def worker_limits_overriden?
@config["workers"] && @config["workers"] > 1 && @klass.workers_not_supported?
end

def calculate_worker_count(default_worker_count)
if @threadsafe || @klass.workers_not_supported?
1
Expand All @@ -71,7 +89,6 @@ def register
@workers.each {|w| w.register}
end

# Threadsafe outputs have a much simpler
def threadsafe_multi_receive(events)
@events_received.increment(events.length)

Expand All @@ -81,6 +98,7 @@ def threadsafe_multi_receive(events)
def worker_multi_receive(events)
@events_received.increment(events.length)

@logger.debug("worker queue pop")
worker = @worker_queue.pop
begin
worker.multi_receive(events)
Expand Down Expand Up @@ -110,4 +128,16 @@ def busy_workers
@workers.size - @worker_queue.size
end
end

private

# Needed for tests
def threadsafe_worker
@threadsafe_worker
end

# Needed for tests
def worker_queue
@worker_queue
end
end end
12 changes: 9 additions & 3 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class LogStash::Outputs::Base < LogStash::Plugin
# Note that this setting may not be useful for all outputs.
config :workers, :validate => :number, :default => 1

attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins
attr_reader :worker_plugins, :available_workers, :workers, :worker_plugins, :workers_not_supported

def self.declare_threadsafe!
declare_workers_not_supported!
Expand All @@ -41,14 +41,20 @@ def self.declare_workers_not_supported!(message=nil)
@workers_not_supported = true
end

def self.workers_not_supported_message
@workers_not_supported_message
end

def self.workers_not_supported?
@workers_not_supported == true
!!@workers_not_supported
end

public
# TODO: Remove this in the next major version after Logstash 2.x
# Post 2.x it should raise an error and tell people to use the class level
# declaration
def workers_not_supported(message=nil)
raise ArgumentError, "Outputs::Base#workers_not_supported is no longer a valid part of the logstash API, please use the class method Outputs::Base.declare_workers_not_supported!"
self.class.declare_workers_not_supported!(message)
end

public
Expand Down
131 changes: 101 additions & 30 deletions logstash-core/spec/logstash/output_delegator_spec.rb
Original file line number Diff line number Diff line change
@@ -1,51 +1,122 @@
# encoding: utf-8
require 'spec_helper'



describe LogStash::OutputDelegator do
let(:logger) { double("logger") }
let(:out_klass) { double("output klass") }
let(:out_inst) { double("output instance") }
let(:events) { 7.times.map { LogStash::Event.new }}
let(:default_worker_count) { 1 }

subject { described_class.new(logger, out_klass, 1) }
subject { described_class.new(logger, out_klass, default_worker_count) }

before do
allow(out_klass).to receive(:new).with(any_args).and_return(out_inst)
allow(out_klass).to receive(:threadsafe?).and_return(false)
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
allow(out_inst).to receive(:register)
allow(logger).to receive(:debug).with(any_args)
end

it "should initialize cleanly" do
expect { subject }.not_to raise_error
end

context "after having received a batch of events" do
let(:events) { 7.times.map { LogStash::Event.new }}
context "with a plain output plugin" do
let(:out_klass) { double("output klass") }
let(:out_inst) { double("output instance") }

before do
allow(out_klass).to receive(:new).with(any_args).and_return(out_inst)
allow(out_klass).to receive(:threadsafe?).and_return(false)
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
allow(out_inst).to receive(:register)
allow(out_inst).to receive(:multi_receive)
subject.multi_receive(events)
allow(logger).to receive(:debug).with(any_args)
end

it "should initialize cleanly" do
expect { subject }.not_to raise_error
end

it "should pass the events through" do
expect(out_inst).to have_received(:multi_receive).with(events)
context "after having received a batch of events" do
before do
subject.multi_receive(events)
end

it "should pass the events through" do
expect(out_inst).to have_received(:multi_receive).with(events)
end

it "should increment the number of events received" do
expect(subject.events_received).to eql(events.length)
end
end

it "should increment the number of events received" do
expect(subject.events_received).to eql(events.length)
it "should register all workers on register" do
expect(out_inst).to receive(:register)
subject.register
end

it "should close all workers when closing" do
expect(out_inst).to receive(:do_close)
subject.do_close
end

describe "concurrency and worker support" do
describe "non-threadsafe outputs that allow workers" do
let(:default_worker_count) { 3 }

before do
allow(out_klass).to receive(:threadsafe?).and_return(false)
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
end

it "should instantiate multiple workers" do
expect(subject.workers.length).to eql(default_worker_count)
end

it "should send received events to the worker" do
expect(out_inst).to receive(:multi_receive).with(events)
subject.multi_receive(events)
end
end

describe "threadsafe outputs" do
before do
allow(out_klass).to receive(:threadsafe?).and_return(true)
allow(out_klass).to receive(:workers_not_supported?).and_return(false)
end

it "should return true when threadsafe? is invoked" do
expect(subject.threadsafe?).to eql(true)
end

it "should define a threadsafe_worker" do
expect(subject.send(:threadsafe_worker)).to eql(out_inst)
end

it "should utilize threadsafe_multi_receive" do
expect(subject.send(:threadsafe_worker)).to receive(:multi_receive).with(events)
subject.multi_receive(events)
end

it "should not utilize the worker queue" do
expect(subject.send(:worker_queue)).not_to receive(:pop)
subject.multi_receive(events)
end

it "should send received events to the worker" do
expect(out_inst).to receive(:multi_receive).with(events)
subject.multi_receive(events)
end
end
end
end

it "should register all workers on register" do
expect(out_inst).to receive(:register)
subject.register
# This may seem suspiciously similar to the class in outputs/base_spec
# but, in fact, we need a whole new class because using this even once
# will immutably modify the base class
class LogStash::Outputs::NOOPDelLegacyNoWorkers < ::LogStash::Outputs::Base
LEGACY_WORKERS_NOT_SUPPORTED_REASON = "legacy reason"

def register
workers_not_supported(LEGACY_WORKERS_NOT_SUPPORTED_REASON)
end
end

it "should close all workers when closing" do
expect(out_inst).to receive(:do_close)
subject.do_close
describe "legacy output workers_not_supported" do
let(:default_worker_count) { 2 }
let(:out_klass) { LogStash::Outputs::NOOPDelLegacyNoWorkers }

it "should only setup one worker" do
expect(subject.worker_count).to eql(1)
end
end
end
13 changes: 13 additions & 0 deletions logstash-core/spec/logstash/outputs/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ def receive(event)
end
end

class LogStash::Outputs::NOOPLegacyNoWorkers < ::LogStash::Outputs::Base
LEGACY_WORKERS_NOT_SUPPORTED_REASON = "legacy reason"

def register
workers_not_supported(LEGACY_WORKERS_NOT_SUPPORTED_REASON)
end
end

describe "LogStash::Outputs::Base#new" do
it "should instantiate cleanly" do
params = { "dummy_option" => "potatoes", "codec" => "json", "workers" => 2 }
Expand All @@ -24,4 +32,9 @@ def receive(event)
LogStash::Outputs::NOOP.new(params.dup)
end.not_to raise_error
end

it "should move workers_not_supported declarations up to the class level" do
LogStash::Outputs::NOOPLegacyNoWorkers.new.register
expect(LogStash::Outputs::NOOPLegacyNoWorkers.workers_not_supported?).to eql(true)
end
end

0 comments on commit f6ff16e

Please sign in to comment.