From c6710cdbae7d7b019b53a08f641286f0f3fe4100 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Wed, 1 Feb 2017 14:48:00 -0500 Subject: [PATCH] support exclusive locking of PQ dir access fix agent and pipeline and specs for queue exclusive access added comments and swapped all sleep 0.01 to 0.1 revert explicit pipeline close in specs using sample helper fix multiple pipelines specs use BasePipeline for config validation which does not instantiate a new queue review modifications improve queue exception message --- logstash-core/lib/logstash/agent.rb | 40 ++++-- logstash-core/lib/logstash/pipeline.rb | 24 +++- logstash-core/lib/logstash/runner.rb | 2 +- logstash-core/spec/conditionals_spec.rb | 2 +- logstash-core/spec/logstash/agent_spec.rb | 129 +++++++++++------ logstash-core/spec/logstash/pipeline_spec.rb | 84 +++++++++-- .../spec/logstash/queue_factory_spec.rb | 13 +- .../java/org/logstash/FileLockFactory.java | 4 +- .../java/org/logstash/ackedqueue/Queue.java | 132 +++++++++++------- .../org/logstash/ackedqueue/HeadPageTest.java | 8 ++ .../org/logstash/ackedqueue/QueueTest.java | 23 +++ .../java/org/logstash/stress/Concurent.java | 1 + 12 files changed, 335 insertions(+), 127 deletions(-) diff --git a/logstash-core/lib/logstash/agent.rb b/logstash-core/lib/logstash/agent.rb index 86406007cba..8d01d238941 100644 --- a/logstash-core/lib/logstash/agent.rb +++ b/logstash-core/lib/logstash/agent.rb @@ -189,7 +189,22 @@ def running_pipelines? end end + def close_pipeline(id) + pipeline = @pipelines[id] + if pipeline + @logger.warn("closing pipeline", :id => id) + pipeline.close + end + end + + def close_pipelines + @pipelines.each do |id, _| + close_pipeline(id) + end + end + private + def start_webserver options = {:http_host => @http_host, :http_ports => @http_port, :http_environment => @http_environment } @webserver = LogStash::WebServer.new(@logger, self, options) @@ -229,17 +244,17 @@ def collect_metrics? @collect_metric end - def increment_reload_failures_metrics(id, config, exception) + def increment_reload_failures_metrics(id, message, backtrace = nil) @instance_reload_metric.increment(:failures) @pipeline_reload_metric.namespace([id.to_sym, :reloads]).tap do |n| n.increment(:failures) - n.gauge(:last_error, { :message => exception.message, :backtrace => exception.backtrace}) + n.gauge(:last_error, { :message => message, :backtrace =>backtrace}) n.gauge(:last_failure_timestamp, LogStash::Timestamp.now) end if @logger.debug? - @logger.error("Cannot load an invalid configuration.", :reason => exception.message, :backtrace => exception.backtrace) + @logger.error("Cannot load an invalid configuration", :reason => message, :backtrace => backtrace) else - @logger.error("Cannot load an invalid configuration.", :reason => exception.message) + @logger.error("Cannot load an invalid configuration", :reason => message) end end @@ -261,7 +276,7 @@ def create_pipeline(settings, config = nil) begin LogStash::Pipeline.new(config, settings, metric) rescue => e - increment_reload_failures_metrics(settings.get("pipeline.id"), config, e) + increment_reload_failures_metrics(settings.get("pipeline.id"), e.message, e.backtrace) return nil end end @@ -294,15 +309,14 @@ def reload_pipeline!(id) begin pipeline_validator = LogStash::BasePipeline.new(new_config, old_pipeline.settings) rescue => e - increment_reload_failures_metrics(id, new_config, e) + increment_reload_failures_metrics(id, e.message, e.backtrace) return end # check if the new pipeline will be reloadable in which case we want to log that as an error and abort if !pipeline_validator.reloadable? @logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"), :pipeline_id => id, :plugins => pipeline_validator.non_reloadable_plugins.map(&:class)) - # TODO: in the original code the failure metrics were not incremented, should we do it here? - # increment_reload_failures_metrics(id, new_config, e) + increment_reload_failures_metrics(id, "non reloadable pipeline") return end @@ -331,20 +345,28 @@ def upgrade_pipeline(pipeline_id, settings, new_config) # this is a scenario where the configuration is valid (compilable) but the new pipeline refused to start # and at this point NO pipeline is running @logger.error("failed to create the reloaded pipeline and no pipeline is currently running", :pipeline => pipeline_id) + increment_reload_failures_metrics(pipeline_id, "failed to create the reloaded pipeline") return end + ### at this point pipeline#close must be called if upgrade_pipeline does not succeed + # check if the new pipeline will be reloadable in which case we want to log that as an error and abort. this should normally not # happen since the check should be done in reload_pipeline! prior to get here. if !new_pipeline.reloadable? @logger.error(I18n.t("logstash.agent.non_reloadable_config_reload"), :pipeline_id => pipeline_id, :plugins => new_pipeline.non_reloadable_plugins.map(&:class)) + increment_reload_failures_metrics(pipeline_id, "non reloadable pipeline") + new_pipeline.close return end + # @pipelines[pipeline_id] must be initialized before #start_pipeline below which uses it @pipelines[pipeline_id] = new_pipeline if !start_pipeline(pipeline_id) @logger.error("failed to start the reloaded pipeline and no pipeline is currently running", :pipeline => pipeline_id) + # do not call increment_reload_failures_metrics here since #start_pipeline already does it on failure + new_pipeline.close return end @@ -373,6 +395,8 @@ def start_pipeline(id) n.gauge(:last_failure_timestamp, LogStash::Timestamp.now) end @logger.error("Pipeline aborted due to error", :exception => e, :backtrace => e.backtrace) + + # TODO: this is weird, why dont we return directly here? any reason we need to enter the while true loop below?! end end while true do diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index e68a0c7ac8d..a802371a136 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -26,7 +26,7 @@ module LogStash; class BasePipeline attr_reader :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id - def initialize(config_str, settings) + def initialize(config_str, settings = SETTINGS) @logger = self.logger @config_str = config_str @config_hash = Digest::SHA1.hexdigest(@config_str) @@ -59,8 +59,7 @@ def initialize(config_str, settings) begin eval(config_code) rescue => e - # TODO: the original code rescue e but does nothing with it, should we re-raise to have original exception details!? - raise + raise e end end @@ -139,7 +138,13 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil) super(config_str, settings) - @queue = LogStash::QueueFactory.create(settings) + begin + @queue = LogStash::QueueFactory.create(settings) + rescue => e + @logger.error("Logstash failed to create queue", "exception" => e, "backtrace" => e.backtrace) + raise e + end + @input_queue_client = @queue.write_client @filter_queue_client = @queue.read_client @signal_queue = Queue.new @@ -216,8 +221,7 @@ def run shutdown_flusher shutdown_workers - @filter_queue_client.close - @queue.close + close @logger.debug("Pipeline #{@pipeline_id} has been shutdown") @@ -225,6 +229,11 @@ def run return 0 end # def run + def close + @filter_queue_client.close + @queue.close + end + def transition_to_running @running.make_true end @@ -327,7 +336,8 @@ def filter_batch(batch) # plugin. @logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.", "exception" => e, "backtrace" => e.backtrace) - raise + + raise e end # Take an array of events and send them to the correct output diff --git a/logstash-core/lib/logstash/runner.rb b/logstash-core/lib/logstash/runner.rb index 924d42a8f48..94b79e35c75 100644 --- a/logstash-core/lib/logstash/runner.rb +++ b/logstash-core/lib/logstash/runner.rb @@ -249,7 +249,7 @@ def execute config_loader = LogStash::Config::Loader.new(logger) config_str = config_loader.format_config(setting("path.config"), setting("config.string")) begin - LogStash::Pipeline.new(config_str) + LogStash::BasePipeline.new(config_str) puts "Configuration OK" logger.info "Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash" return 0 diff --git a/logstash-core/spec/conditionals_spec.rb b/logstash-core/spec/conditionals_spec.rb index 52b2b4db53d..80a4bd7bf25 100644 --- a/logstash-core/spec/conditionals_spec.rb +++ b/logstash-core/spec/conditionals_spec.rb @@ -287,7 +287,7 @@ def multi_receive(events) conditional "!([message] <= 'sample')" do sample("apple") { expect(subject.get("tags")).not_to include("success") } sample("zebra") { expect(subject.get("tags")).not_to include("failure") } - sample("sample") { expect(subject.get("tags")).not_to include("success") } + sample("sample") { expect(subject.get("tags")).not_to include("success")} end conditional "!([message] >= 'sample')" do diff --git a/logstash-core/spec/logstash/agent_spec.rb b/logstash-core/spec/logstash/agent_spec.rb index 5d1b840292f..7c40baab865 100644 --- a/logstash-core/spec/logstash/agent_spec.rb +++ b/logstash-core/spec/logstash/agent_spec.rb @@ -16,7 +16,7 @@ let(:config_file) { Stud::Temporary.pathname } let(:config_file_txt) { "input { generator { count => 100000 } } output { }" } - subject { LogStash::Agent.new(agent_settings) } + subject { LogStash::Agent.new(agent_settings) } before :each do # This MUST run first, before `subject` is invoked to ensure clean state @@ -52,6 +52,10 @@ } end + after(:each) do + subject.close_pipelines + end + it "should delegate settings to new pipeline" do expect(LogStash::Pipeline).to receive(:new) do |arg1, arg2| expect(arg1).to eq(config_string) @@ -75,7 +79,7 @@ end end - describe "#execute" do + describe "#execute" do let(:config_file_txt) { "input { generator { count => 100000 } } output { }" } before :each do @@ -105,7 +109,10 @@ it "should not reload_state!" do expect(subject).to_not receive(:reload_state!) t = Thread.new { subject.execute } - sleep 0.1 + + # TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of + # a bad test design or missing class functionality. + sleep(0.1) Stud.stop!(t) t.join subject.shutdown @@ -118,11 +125,14 @@ it "does not upgrade the new config" do t = Thread.new { subject.execute } - sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready? + sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.ready? expect(subject).to_not receive(:upgrade_pipeline) File.open(config_file, "w") { |f| f.puts second_pipeline_config } subject.send(:"reload_pipeline!", "main") - sleep 0.1 + + # TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of + # a bad test design or missing class functionality. + sleep(0.1) Stud.stop!(t) t.join subject.shutdown @@ -134,14 +144,16 @@ it "does upgrade the new config" do t = Thread.new { subject.execute } - sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.ready? + sleep(0.1) until subject.running_pipelines? && subject.pipelines.values.first.ready? expect(subject).to receive(:upgrade_pipeline).once.and_call_original File.open(config_file, "w") { |f| f.puts second_pipeline_config } subject.send(:"reload_pipeline!", "main") - sleep 0.1 + + # TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of + # a bad test design or missing class functionality. + sleep(0.1) Stud.stop!(t) t.join - subject.shutdown end end @@ -154,14 +166,16 @@ it "does not try to reload the pipeline" do t = Thread.new { subject.execute } - sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running? + sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running? expect(subject).to_not receive(:reload_pipeline!) File.open(config_file, "w") { |f| f.puts second_pipeline_config } subject.reload_state! - sleep 0.1 + + # TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of + # a bad test design or missing class functionality. + sleep(0.1) Stud.stop!(t) t.join - subject.shutdown end end @@ -172,14 +186,16 @@ it "tries to reload the pipeline" do t = Thread.new { subject.execute } - sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running? + sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running? expect(subject).to receive(:reload_pipeline!).once.and_call_original File.open(config_file, "w") { |f| f.puts second_pipeline_config } subject.reload_state! - sleep 0.1 + + # TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of + # a bad test design or missing class functionality. + sleep(0.1) Stud.stop!(t) t.join - subject.shutdown end end @@ -203,9 +219,12 @@ it "should periodically reload_state" do allow(subject).to receive(:clean_state?).and_return(false) t = Thread.new { subject.execute } - sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running? + sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running? expect(subject).to receive(:reload_state!).at_least(2).times - sleep 0.1 + + # TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of + # a bad test design or missing class functionality. + sleep(0.1) Stud.stop!(t) t.join subject.shutdown @@ -218,10 +237,13 @@ it "does not upgrade the new config" do t = Thread.new { subject.execute } - sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running? + sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running? expect(subject).to_not receive(:upgrade_pipeline) File.open(config_file, "w") { |f| f.puts second_pipeline_config } - sleep 0.1 + + # TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of + # a bad test design or missing class functionality. + sleep(0.1) Stud.stop!(t) t.join subject.shutdown @@ -233,10 +255,13 @@ it "does upgrade the new config" do t = Thread.new { subject.execute } - sleep 0.01 until subject.running_pipelines? && subject.pipelines.values.first.running? + sleep(0.01) until subject.running_pipelines? && subject.pipelines.values.first.running? expect(subject).to receive(:upgrade_pipeline).once.and_call_original File.open(config_file, "w") { |f| f.puts second_pipeline_config } - sleep 0.1 + + # TODO: refactor this. forcing an arbitrary fixed delay for thread concurrency issues is an indication of + # a bad test design or missing class functionality. + sleep(0.1) Stud.stop!(t) t.join subject.shutdown @@ -259,6 +284,10 @@ subject.register_pipeline(pipeline_settings) end + after(:each) do + subject.close_pipelines + end + context "when fetching a new state" do it "upgrades the state" do expect(subject).to receive(:fetch_config).and_return(second_pipeline_config) @@ -291,6 +320,7 @@ after :each do ENV["FOO"] = @foo_content + subject.close_pipelines end it "doesn't upgrade the state" do @@ -314,14 +344,20 @@ end after(:each) do - subject.shutdown + # new pipelines will be created part of the upgrade process so we need + # to close any initialized pipelines + subject.close_pipelines end context "when the upgrade fails" do before :each do allow(subject).to receive(:fetch_config).and_return(new_pipeline_config) allow(subject).to receive(:create_pipeline).and_return(nil) - allow(subject).to receive(:stop_pipeline) + allow(subject).to receive(:stop_pipeline) do |id| + # we register_pipeline but we never execute them so we have to mock #stop_pipeline to + # not call Pipeline#shutdown but Pipeline#close + subject.close_pipeline(id) + end end it "leaves the state untouched" do @@ -339,18 +375,29 @@ context "when the upgrade succeeds" do let(:new_config) { "input { generator { count => 1 } } output { }" } + before :each do allow(subject).to receive(:fetch_config).and_return(new_config) - allow(subject).to receive(:stop_pipeline) - allow(subject).to receive(:start_pipeline) + allow(subject).to receive(:start_pipeline).and_return(true) + allow(subject).to receive(:stop_pipeline) do |id| + # we register_pipeline but we never execute them so we have to mock #stop_pipeline to + # not call Pipeline#shutdown but Pipeline#close + subject.close_pipeline(id) + end end + it "updates the state" do subject.send(:"reload_pipeline!", default_pipeline_id) expect(subject.pipelines[default_pipeline_id].config_str).to eq(new_config) end + it "starts the pipeline" do - expect(subject).to receive(:stop_pipeline) - expect(subject).to receive(:start_pipeline) + expect(subject).to receive(:start_pipeline).and_return(true) + expect(subject).to receive(:stop_pipeline) do |id| + # we register_pipeline but we never execute them so we have to mock #stop_pipeline to + # not call Pipeline#shutdown but Pipeline#close + subject.close_pipeline(id) + end subject.send(:"reload_pipeline!", default_pipeline_id) end end @@ -378,9 +425,9 @@ end end - context "metrics after config reloading" do let!(:config) { "input { generator { } } output { dummyoutput { } }" } + let!(:config_path) do f = Stud::Temporary.file f.write(config) @@ -388,6 +435,7 @@ f.close f.path end + let(:pipeline_args) do { "pipeline.workers" => 2, @@ -411,6 +459,13 @@ class DummyOutput2 < LogStash::Outputs::DroppingDummyOutput; end let!(:dummy_output) { LogStash::Outputs::DroppingDummyOutput.new } let!(:dummy_output2) { DummyOutput2.new } let(:initial_generator_threshold) { 1000 } + let(:pipeline_thread) do + Thread.new do + subject.register_pipeline(pipeline_settings) + subject.execute + end + end + before :each do allow(LogStash::Outputs::DroppingDummyOutput).to receive(:new).at_least(:once).with(anything).and_return(dummy_output) @@ -424,20 +479,17 @@ class DummyOutput2 < LogStash::Outputs::DroppingDummyOutput; end @abort_on_exception = Thread.abort_on_exception Thread.abort_on_exception = true - @t = Thread.new do - subject.register_pipeline(pipeline_settings) - subject.execute - end + pipeline_thread # wait for some events to reach the dummy_output - sleep(0.01) until dummy_output.events_received > initial_generator_threshold + sleep(0.1) until dummy_output.events_received > initial_generator_threshold end after :each do begin subject.shutdown - Stud.stop!(@t) - @t.join + Stud.stop!(pipeline_thread) + pipeline_thread.join ensure Thread.abort_on_exception = @abort_on_exception end @@ -446,8 +498,8 @@ class DummyOutput2 < LogStash::Outputs::DroppingDummyOutput; end context "when reloading a good config" do let(:new_config_generator_counter) { 500 } let(:new_config) { "input { generator { count => #{new_config_generator_counter} } } output { dummyoutput2 {} }" } - before :each do + before :each do File.open(config_path, "w") do |f| f.write(new_config) f.fsync @@ -496,12 +548,10 @@ class DummyOutput2 < LogStash::Outputs::DroppingDummyOutput; end value = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads][:last_error].value expect(value).to be(nil) end - end context "when reloading a bad config" do let(:new_config) { "input { generator { count => " } - let(:new_config_generator_counter) { 500 } before :each do File.open(config_path, "w") do |f| @@ -546,7 +596,6 @@ class DummyOutput2 < LogStash::Outputs::DroppingDummyOutput; end context "when reloading a config that raises exception on pipeline.run" do let(:new_config) { "input { generator { count => 10000 } }" } - let(:new_config_generator_counter) { 500 } class BrokenGenerator < LogStash::Inputs::Generator def register @@ -555,14 +604,12 @@ def register end before :each do - allow(LogStash::Plugin).to receive(:lookup).with("input", "generator").and_return(BrokenGenerator) File.open(config_path, "w") do |f| f.write(new_config) f.fsync end - end it "does not increase the successful reload count" do @@ -573,7 +620,7 @@ def register } end - it "increases the failured reload count" do + it "increases the failures reload count" do expect { subject.send(:"reload_pipeline!", "main") }.to change { snapshot = subject.metric.collector.snapshot_metric reload_metrics = snapshot.metric_store.get_with_path("/stats/pipelines")[:stats][:pipelines][:main][:reloads] diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index 791ca1f8e87..dd9b9ae980b 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -138,8 +138,8 @@ class TestPipeline < LogStash::Pipeline Thread.abort_on_exception = true pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj) - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? + t = Thread.new { pipeline.run } + sleep(0.1) until pipeline.ready? wait(3).for do # give us a bit of time to flush the events # puts("*****" + output.events.map{|e| e.message}.to_s) @@ -149,6 +149,7 @@ class TestPipeline < LogStash::Pipeline expect(output.events[0].get("tags")).to eq(["notdropped"]) expect(output.events[1].get("tags")).to eq(["notdropped"]) pipeline.shutdown + t.join Thread.abort_on_exception = abort_on_exception_state end @@ -192,12 +193,14 @@ class TestPipeline < LogStash::Pipeline pipeline_settings_obj.set("config.debug", false) expect(logger).not_to receive(:debug).with(/Compiled pipeline/, anything) pipeline = TestPipeline.new(test_config_with_filters) + pipeline.close end it "should print the compiled code if config.debug is set to true" do pipeline_settings_obj.set("config.debug", true) expect(logger).to receive(:debug).with(/Compiled pipeline/, anything) pipeline = TestPipeline.new(test_config_with_filters, pipeline_settings_obj) + pipeline.close end end @@ -363,7 +366,6 @@ class TestPipeline < LogStash::Pipeline sample(["foo", "bar"]) do expect(subject.size).to eq(2) - expect(subject[0].get("message")).to eq("foo\nbar") expect(subject[0].get("type")).to be_nil expect(subject[1].get("message")).to eq("foo\nbar") @@ -385,9 +387,14 @@ class TestPipeline < LogStash::Pipeline allow(LogStash::Plugin).to receive(:lookup).with("codec", "plain").and_return(DummyCodec) allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput) allow(logger).to receive(:warn) - thread = Thread.new { pipeline.run } + + # pipeline must be first called outside the thread context because it lazyly initialize and will create a + # race condition if called in the thread + p = pipeline + t = Thread.new { p.run } + sleep(0.1) until pipeline.ready? pipeline.shutdown - thread.join + t.join end it "should not raise a max inflight warning if the max_inflight count isn't exceeded" do @@ -440,6 +447,10 @@ class TestPipeline < LogStash::Pipeline let(:settings) { LogStash::SETTINGS.clone } subject { LogStash::Pipeline.new(config, settings, metric) } + after :each do + subject.close + end + context "when metric.collect is disabled" do before :each do settings.set("metric.collect", false) @@ -528,9 +539,21 @@ class TestPipeline < LogStash::Pipeline allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutputmore").and_return(DummyOutputMore) end + # multiple pipelines cannot be instantiated using the same PQ settings, force memory queue + before :each do + pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type") + allow(pipeline_workers_setting).to receive(:value).and_return("memory") + pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) } + end + let(:pipeline1) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutput {}}") } let(:pipeline2) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutputmore {}}") } + after do + pipeline1.close + pipeline2.close + end + it "should handle evaluating different config" do expect(pipeline1.output_func(LogStash::Event.new)).not_to include(nil) expect(pipeline1.filter_func(LogStash::Event.new)).not_to include(nil) @@ -573,8 +596,8 @@ class TestPipeline < LogStash::Pipeline it "flushes the buffered contents of the filter" do Thread.abort_on_exception = true pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj) - Thread.new { pipeline.run } - sleep 0.1 while !pipeline.ready? + t = Thread.new { pipeline.run } + sleep(0.1) until pipeline.ready? wait(3).for do # give us a bit of time to flush the events output.events.empty? @@ -582,6 +605,7 @@ class TestPipeline < LogStash::Pipeline event = output.events.pop expect(event.get("message").count("\n")).to eq(99) pipeline.shutdown + t.join end end @@ -596,6 +620,13 @@ class TestPipeline < LogStash::Pipeline let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } let(:pipeline2) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } + # multiple pipelines cannot be instantiated using the same PQ settings, force memory queue + before :each do + pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type") + allow(pipeline_workers_setting).to receive(:value).and_return("memory") + pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) } + end + it "should handle evaluating different config" do # When the functions are compiled from the AST it will generate instance # variables that are unique to the actual config, the intances are pointing @@ -626,8 +657,14 @@ class TestPipeline < LogStash::Pipeline subject { described_class.new(config) } - it "returns nil when the pipeline isnt started" do - expect(subject.started_at).to be_nil + context "when the pipeline is not started" do + after :each do + subject.close + end + + it "returns nil when the pipeline isnt started" do + expect(subject.started_at).to be_nil + end end it "return when the pipeline started working" do @@ -648,6 +685,10 @@ class TestPipeline < LogStash::Pipeline subject { described_class.new(config) } context "when the pipeline is not started" do + after :each do + subject.close + end + it "returns 0" do expect(subject.uptime).to eq(0) end @@ -655,10 +696,15 @@ class TestPipeline < LogStash::Pipeline context "when the pipeline is started" do it "return the duration in milliseconds" do - t = Thread.new { subject.run } + # subject must be first call outside the thread context because of lazy initialization + s = subject + t = Thread.new { s.run } + sleep(0.1) until subject.ready? + sleep(0.1) expect(subject.uptime).to be > 0 subject.shutdown + t.join end end end @@ -704,6 +750,12 @@ class TestPipeline < LogStash::Pipeline end let(:dummyoutput) { ::LogStash::Outputs::DummyOutput.new({ "id" => dummy_output_id }) } let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store } + let(:pipeline_thread) do + # subject has to be called for the first time outside the thread because it will create a race condition + # with the subject.ready? call since subject is lazily initialized + s = subject + Thread.new { s.run } + end before :each do allow(::LogStash::Outputs::DummyOutput).to receive(:new).with(any_args).and_return(dummyoutput) @@ -712,7 +764,9 @@ class TestPipeline < LogStash::Pipeline allow(LogStash::Plugin).to receive(:lookup).with("filter", "multiline").and_return(LogStash::Filters::Multiline) allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput) - Thread.new { subject.run } + pipeline_thread + sleep(0.1) until subject.ready? + # make sure we have received all the generated events wait(3).for do # give us a bit of time to flush the events @@ -722,6 +776,7 @@ class TestPipeline < LogStash::Pipeline after :each do subject.shutdown + pipeline_thread.join end context "global metric" do @@ -787,6 +842,13 @@ class TestPipeline < LogStash::Pipeline let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } let(:pipeline2) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } + # multiple pipelines cannot be instantiated using the same PQ settings, force memory queue + before :each do + pipeline_workers_setting = LogStash::SETTINGS.get_setting("queue.type") + allow(pipeline_workers_setting).to receive(:value).and_return("memory") + pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) } + end + it "should not add ivars" do expect(pipeline1.instance_variables).to eq(pipeline2.instance_variables) end diff --git a/logstash-core/spec/logstash/queue_factory_spec.rb b/logstash-core/spec/logstash/queue_factory_spec.rb index fa0e2160d11..9182c9c95be 100644 --- a/logstash-core/spec/logstash/queue_factory_spec.rb +++ b/logstash-core/spec/logstash/queue_factory_spec.rb @@ -36,7 +36,9 @@ end it "returns a `WrappedAckedQueue`" do - expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedAckedQueue) + queue = subject.create(settings) + expect(queue).to be_kind_of(LogStash::Util::WrappedAckedQueue) + queue.close end describe "per pipeline id subdirectory creation" do @@ -50,6 +52,7 @@ expect(Dir.exist?(queue_path)).to be_falsey queue = subject.create(settings) expect(Dir.exist?(queue_path)).to be_truthy + queue.close end end end @@ -60,7 +63,9 @@ end it "returns a `WrappedAckedQueue`" do - expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedAckedQueue) + queue = subject.create(settings) + expect(queue).to be_kind_of(LogStash::Util::WrappedAckedQueue) + queue.close end end @@ -70,7 +75,9 @@ end it "returns a `WrappedAckedQueue`" do - expect(subject.create(settings)).to be_kind_of(LogStash::Util::WrappedSynchronousQueue) + queue = subject.create(settings) + expect(queue).to be_kind_of(LogStash::Util::WrappedSynchronousQueue) + queue.close end end end diff --git a/logstash-core/src/main/java/org/logstash/FileLockFactory.java b/logstash-core/src/main/java/org/logstash/FileLockFactory.java index 58f7fd75746..b553ab9e6e7 100644 --- a/logstash-core/src/main/java/org/logstash/FileLockFactory.java +++ b/logstash-core/src/main/java/org/logstash/FileLockFactory.java @@ -87,7 +87,7 @@ public FileLock obtainLock(String lockDir, String lockName) throws IOException { LOCK_MAP.put(lock, realLockPath.toString()); return lock; } else { - throw new LockException("Lock held by another program: " + realLockPath); + throw new LockException("Lock held by another program on lock path: " + realLockPath); } } finally { if (lock == null) { // not successful - clear up and move out @@ -106,7 +106,7 @@ public FileLock obtainLock(String lockDir, String lockName) throws IOException { } } } else { - throw new LockException("Lock held by this virtual machine: " + realLockPath); + throw new LockException("Lock held by this virtual machine on lock path: " + realLockPath); } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index 981e9af00d5..3a6a4e77a46 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -2,6 +2,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.logstash.FileLockFactory; +import org.logstash.LockException; import org.logstash.common.io.CheckpointIO; import org.logstash.common.io.PageIO; import org.logstash.common.io.PageIOFactory; @@ -10,6 +12,7 @@ import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.nio.channels.FileLock; import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.List; @@ -66,6 +69,10 @@ public class Queue implements Closeable { private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); + // exclusive dir access + private FileLock dirLock; + private final static String LOCK_NAME = ".lock"; + private static final Logger logger = LogManager.getLogger(Queue.class); public Queue(Settings settings) { @@ -142,82 +149,94 @@ public void open() throws IOException { if (!this.closed.get()) { throw new IOException("queue already opened"); } - Checkpoint headCheckpoint; + lock.lock(); try { - headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName()); - } catch (NoSuchFileException e) { - // if there is no head checkpoint, create a new headpage and checkpoint it and exit method + // verify exclusive access to the dirPath + this.dirLock = FileLockFactory.getDefault().obtainLock(this.dirPath, LOCK_NAME); - this.seqNum = 0; - headPageNum = 0; + Checkpoint headCheckpoint; + try { + headCheckpoint = this.checkpointIO.read(checkpointIO.headFileName()); + } catch (NoSuchFileException e) { + // if there is no head checkpoint, create a new headpage and checkpoint it and exit method - newCheckpointedHeadpage(headPageNum); - this.closed.set(false); + this.seqNum = 0; + headPageNum = 0; - return; - } + newCheckpointedHeadpage(headPageNum); + this.closed.set(false); - // at this point we have a head checkpoint to figure queue recovery + return; + } - // reconstruct all tail pages state upto but excluding the head page - for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) { + // at this point we have a head checkpoint to figure queue recovery - // all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException - Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum)); + // reconstruct all tail pages state upto but excluding the head page + for (int pageNum = headCheckpoint.getFirstUnackedPageNum(); pageNum < headCheckpoint.getPageNum(); pageNum++) { - PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath); - pageIO.open(cp.getMinSeqNum(), cp.getElementCount()); + // all tail checkpoints in the sequence should exist, if not abort mission with a NoSuchFileException + Checkpoint cp = this.checkpointIO.read(this.checkpointIO.tailFileName(pageNum)); - add(cp, new TailPage(cp, this, pageIO)); - } + PageIO pageIO = this.pageIOFactory.build(pageNum, this.pageCapacity, this.dirPath); + pageIO.open(cp.getMinSeqNum(), cp.getElementCount()); - // transform the head page into a tail page only if the headpage is non-empty - // in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages + add(cp, new TailPage(cp, this, pageIO)); + } + + // transform the head page into a tail page only if the headpage is non-empty + // in both cases it will be checkpointed to track any changes in the firstUnackedPageNum when reconstructing the tail pages - PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath); - pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data + PageIO pageIO = this.pageIOFactory.build(headCheckpoint.getPageNum(), this.pageCapacity, this.dirPath); + pageIO.recover(); // optimistically recovers the head page data file and set minSeqNum and elementCount to the actual read/recovered data - if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) { - // the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes + if (pageIO.getMinSeqNum() != headCheckpoint.getMinSeqNum() || pageIO.getElementCount() != headCheckpoint.getElementCount()) { + // the recovered page IO shows different minSeqNum or elementCount than the checkpoint, use the page IO attributes - logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum()); - logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount()); + logger.warn("recovered head data page {} is different than checkpoint, using recovered page information", headCheckpoint.getPageNum()); + logger.debug("head checkpoint minSeqNum={} or elementCount={} is different than head pageIO minSeqNum={} or elementCount={}", headCheckpoint.getMinSeqNum(), headCheckpoint.getElementCount(), pageIO.getMinSeqNum(), pageIO.getElementCount()); - long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum(); - if (firstUnackedSeqNum < pageIO.getMinSeqNum()) { - logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum()); - firstUnackedSeqNum = pageIO.getMinSeqNum(); + long firstUnackedSeqNum = headCheckpoint.getFirstUnackedSeqNum(); + if (firstUnackedSeqNum < pageIO.getMinSeqNum()) { + logger.debug("head checkpoint firstUnackedSeqNum={} is < head pageIO minSeqNum={}, using pageIO minSeqNum", firstUnackedSeqNum, pageIO.getMinSeqNum()); + firstUnackedSeqNum = pageIO.getMinSeqNum(); + } + headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount()); } - headCheckpoint = new Checkpoint(headCheckpoint.getPageNum(), headCheckpoint.getFirstUnackedPageNum(), firstUnackedSeqNum, pageIO.getMinSeqNum(), pageIO.getElementCount()); - } - this.headPage = new HeadPage(headCheckpoint, this, pageIO); + this.headPage = new HeadPage(headCheckpoint, this, pageIO); - if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) { - // head page is empty, let's keep it as-is + if (this.headPage.getMinSeqNum() <= 0 && this.headPage.getElementCount() <= 0) { + // head page is empty, let's keep it as-is - this.currentByteSize += pageIO.getCapacity(); + this.currentByteSize += pageIO.getCapacity(); - // but checkpoint it to update the firstUnackedPageNum if it changed - this.headPage.checkpoint(); - } else { - // head page is non-empty, transform it into a tail page and create a new empty head page - add(headCheckpoint, this.headPage.behead()); + // but checkpoint it to update the firstUnackedPageNum if it changed + this.headPage.checkpoint(); + } else { + // head page is non-empty, transform it into a tail page and create a new empty head page + add(headCheckpoint, this.headPage.behead()); - headPageNum = headCheckpoint.getPageNum() + 1; - newCheckpointedHeadpage(headPageNum); + headPageNum = headCheckpoint.getPageNum() + 1; + newCheckpointedHeadpage(headPageNum); - // track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum - if (headCheckpoint.maxSeqNum() > this.seqNum) { this.seqNum = headCheckpoint.maxSeqNum(); } - } + // track the seqNum as we add this new tail page, prevent empty tailPage with a minSeqNum of 0 to reset seqNum + if (headCheckpoint.maxSeqNum() > this.seqNum) { + this.seqNum = headCheckpoint.maxSeqNum(); + } + } - // only activate the first tail page - if (tailPages.size() > 0) { - this.tailPages.get(0).getPageIO().activate(); - } + // only activate the first tail page + if (tailPages.size() > 0) { + this.tailPages.get(0).getPageIO().activate(); + } - // TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start? + // TODO: here do directory traversal and cleanup lingering pages? could be a background operations to not delay queue start? - this.closed.set(false); + this.closed.set(false); + } catch (LockException e) { + throw new LockException("The queue failed to obtain exclusive access, cause: " + e.getMessage()); + } finally { + lock.unlock(); + } } // add a read tail page into this queue structures but also verify that this tail page @@ -597,7 +616,14 @@ public void close() throws IOException { // unblock blocked writes. a write is blocked *after* the write has been performed so // unblocking is safe and will return from the write call notFull.signalAll(); + } finally { + try { + FileLockFactory.getDefault().releaseLock(this.dirLock); + } catch (IOException e) { + // log error and ignore + logger.error("Queue close releaseLock failed, error={}", e.getMessage()); + } lock.unlock(); } } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/HeadPageTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/HeadPageTest.java index efdc1229faa..270664fab8c 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/HeadPageTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/HeadPageTest.java @@ -25,6 +25,8 @@ public void newHeadPage() throws IOException { assertThat(p.isFullyAcked(), is(false)); assertThat(p.hasSpace(10), is(true)); assertThat(p.hasSpace(100), is(false)); + + q.close(); } @Test @@ -43,6 +45,8 @@ public void pageWrite() throws IOException { assertThat(p.hasSpace(element.serialize().length), is(false)); assertThat(p.isFullyRead(), is(false)); assertThat(p.isFullyAcked(), is(false)); + + q.close(); } @Test @@ -67,6 +71,8 @@ public void pageWriteAndReadSingle() throws IOException { assertThat(p.hasSpace(element.serialize().length), is(false)); assertThat(p.isFullyRead(), is(true)); assertThat(p.isFullyAcked(), is(false)); + + q.close(); } @Test @@ -91,6 +97,8 @@ public void pageWriteAndReadMulti() throws IOException { assertThat(p.hasSpace(element.serialize().length), is(false)); assertThat(p.isFullyRead(), is(true)); assertThat(p.isFullyAcked(), is(false)); + + q.close(); } // disabled test until we figure what to do in this condition diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index a2965758ba6..c3bdd49607c 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -42,6 +42,8 @@ public void newQueue() throws IOException { q.open(); assertThat(q.nonBlockReadBatch(1), is(equalTo(null))); + + q.close(); } @Test @@ -57,6 +59,8 @@ public void singleWriteRead() throws IOException { assertThat(b.getElements().size(), is(equalTo(1))); assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString()))); assertThat(q.nonBlockReadBatch(1), is(equalTo(null))); + + q.close(); } @Test @@ -72,6 +76,8 @@ public void singleWriteMultiRead() throws IOException { assertThat(b.getElements().size(), is(equalTo(1))); assertThat(b.getElements().get(0).toString(), is(equalTo(element.toString()))); assertThat(q.nonBlockReadBatch(2), is(equalTo(null))); + + q.close(); } @Test @@ -95,6 +101,8 @@ public void multiWriteSamePage() throws IOException { assertThat(b.getElements().size(), is(equalTo(1))); assertThat(b.getElements().get(0).toString(), is(equalTo(elements.get(2).toString()))); + + q.close(); } @Test @@ -137,6 +145,8 @@ public void writeMultiPage() throws IOException { b = q.nonBlockReadBatch(10); assertThat(b, is(equalTo(null))); + + q.close(); } @@ -178,6 +188,8 @@ public void writeMultiPageWithInOrderAcking() throws IOException { b.close(); assertThat(q.getHeadPage().isFullyAcked(), is(equalTo(true))); + + q.close(); } @Test @@ -263,6 +275,8 @@ public void writeMultiPageWithInOrderAckingCheckpoints() throws IOException { assertThat(c.getMinSeqNum(), is(equalTo(3L))); assertThat(c.getFirstUnackedSeqNum(), is(equalTo(5L))); assertThat(c.getFirstUnackedPageNum(), is(equalTo(1))); + + q.close(); } @Test @@ -304,6 +318,8 @@ public void randomAcking() throws IOException { } assertThat(q.getTailPages().size(), is(equalTo(0))); + + q.close(); } } @@ -353,6 +369,8 @@ public void reachMaxUnread() throws IOException, InterruptedException, Execution // since we did not ack and pages hold a single item assertThat(q.getTailPages().size(), is(equalTo(ELEMENT_COUNT))); + + q.close(); } @Test @@ -406,6 +424,8 @@ public void reachMaxUnreadWithAcking() throws IOException, InterruptedException, assertThat(q.getHeadPage().getElementCount() > 0L, is(true)); assertThat(q.getHeadPage().unreadCount(), is(equalTo(1L))); assertThat(q.unreadCount, is(equalTo(1L))); + + q.close(); } @Test(timeout = 5000) @@ -440,6 +460,7 @@ public void reachMaxSizeTest() throws IOException, InterruptedException, Executi assertThat(q.isFull(), is(true)); executor.shutdown(); + q.close(); } @Test(timeout = 5000) @@ -483,6 +504,7 @@ public void resumeWriteOnNoLongerFullQueueTest() throws IOException, Interrupted assertThat(q.isFull(), is(false)); executor.shutdown(); + q.close(); } @Test(timeout = 5000) @@ -523,6 +545,7 @@ public void queueStillFullAfterPartialPageAckTest() throws IOException, Interrup assertThat(q.isFull(), is(true)); // queue should still be full executor.shutdown(); + q.close(); } @Test diff --git a/logstash-core/src/test/java/org/logstash/stress/Concurent.java b/logstash-core/src/test/java/org/logstash/stress/Concurent.java index 48a4a5385a6..cf53720cd0d 100644 --- a/logstash-core/src/test/java/org/logstash/stress/Concurent.java +++ b/logstash-core/src/test/java/org/logstash/stress/Concurent.java @@ -151,6 +151,7 @@ public static void oneProducersOneMultipleConsumer() throws IOException, Interru // gotta hate exception handling in lambdas consumers.forEach(c -> {try{c.join();} catch(InterruptedException e) {throw new RuntimeException(e);}}); + q.close(); Instant end = Instant.now();