Skip to content

Commit

Permalink
Introduce DeadLetterQueue to the Execution Context (elastic#6894)
Browse files Browse the repository at this point in the history
* Introduce a DeadLetterQueueFactory

DeadLetterQueueFactory is a static class that keeps
a static collection of DeadLetterQueueWriteManagers per
pipeline that has plugins requesting to use it.

* DeadLetterQueue was added as a first-class field in the execution context that input/filter/output plugins can leverage
  • Loading branch information
talevy authored May 9, 2017
1 parent 69f5e46 commit 458910b
Show file tree
Hide file tree
Showing 23 changed files with 460 additions and 68 deletions.
10 changes: 10 additions & 0 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false
#
# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
Expand Down
8 changes: 8 additions & 0 deletions docs/static/settings-file.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ The `logstash.yml` file includes the following settings:
| The interval in milliseconds when a checkpoint is forced on the head page when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.interval: 0` for no periodic checkpoint.
| 1000

| `dead_letter_queue.enable`
| Flag to instruct Logstash to enable the DLQ feature supported by plugins.
| `false`

| `path.dead_letter_queue`
| The directory path where the data files will be stored for the dead-letter queue.
| `path.data/dead_letter_queue`

| `http.host`
| The bind address for the metrics REST endpoint.
| `"127.0.0.1"`
Expand Down
15 changes: 12 additions & 3 deletions logstash-core/lib/logstash/environment.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ module Environment
Setting::Numeric.new("queue.checkpoint.acks", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.writes", 1024), # 0 is unlimited
Setting::Numeric.new("queue.checkpoint.interval", 1000), # 0 is no time-based checkpointing
Setting::Boolean.new("dead_letter_queue.enable", false),
Setting::TimeValue.new("slowlog.threshold.warn", "-1"),
Setting::TimeValue.new("slowlog.threshold.info", "-1"),
Setting::TimeValue.new("slowlog.threshold.debug", "-1"),
Expand All @@ -59,13 +60,21 @@ module Environment
# Compute the default queue path based on `path.data`
default_queue_file_path = ::File.join(SETTINGS.get("path.data"), "queue")
SETTINGS.register Setting::WritableDirectory.new("path.queue", default_queue_file_path)

# Compute the default dead_letter_queue path based on `path.data`
default_dlq_file_path = ::File.join(SETTINGS.get("path.data"), "dead_letter_queue")
SETTINGS.register Setting::WritableDirectory.new("path.dead_letter_queue", default_dlq_file_path)

SETTINGS.on_post_process do |settings|
# If the data path is overridden but the queue path isn't recompute the queue path
# We need to do this at this stage because of the weird execution order
# our monkey-patched Clamp follows
if settings.set?("path.data") && !settings.set?("path.queue")
settings.set_value("path.queue", ::File.join(settings.get("path.data"), "queue"))
if settings.set?("path.data")
if !settings.set?("path.queue")
settings.set_value("path.queue", ::File.join(settings.get("path.data"), "queue"))
end
if !settings.set?("path.dead_letter_queue")
settings.set_value("path.dead_letter_queue", ::File.join(settings.get("path.data"), "dead_letter_queue"))
end
end
end

Expand Down
10 changes: 7 additions & 3 deletions logstash-core/lib/logstash/execution_context.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
# encoding: utf-8
require "logstash/util/dead_letter_queue_manager"
module LogStash
class ExecutionContext
attr_reader :pipeline, :agent
attr_reader :pipeline, :agent, :dlq_writer

def initialize(pipeline, agent)
def initialize(pipeline, agent, plugin_id, plugin_type, dlq_writer)
@pipeline = pipeline
@agent = agent
@plugin_id = plugin_id
@plugin_type = plugin_type
@dlq_writer = LogStash::Util::PluginDeadLetterQueueWriter.new(dlq_writer, @plugin_id, @plugin_type)
end

def pipeline_id
@pipeline.pipeline_id
end
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/inputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def execution_context=(context)
super
# There is no easy way to propage an instance variable into the codec, because the codec
# are created at the class level
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
# parent plugin's
@codec.execution_context = context
context
end
Expand Down
2 changes: 2 additions & 0 deletions logstash-core/lib/logstash/outputs/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def execution_context=(context)
super
# There is no easy way to propage an instance variable into the codec, because the codec
# are created at the class level
# TODO(talevy): Codecs should have their own execution_context, for now they will inherit their
# parent plugin's
@codec.execution_context = context
context
end
Expand Down
24 changes: 18 additions & 6 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
require "logstash/instrument/namespaced_null_metric"
require "logstash/instrument/collector"
require "logstash/instrument/wrapped_write_client"
require "logstash/util/dead_letter_queue_manager"
require "logstash/output_delegator"
require "logstash/filter_delegator"
require "logstash/queue_factory"
require "logstash/compiler"
require "logstash/execution_context"

java_import org.logstash.common.DeadLetterQueueFactory
java_import org.logstash.common.io.DeadLetterQueueWriter

module LogStash; class BasePipeline
include LogStash::Util::Loggable

Expand All @@ -49,7 +53,13 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil, agent =
@inputs = nil
@filters = nil
@outputs = nil
@execution_context = LogStash::ExecutionContext.new(self, agent)
@agent = agent

if settings.get_value("dead_letter_queue.enable")
@dlq_writer = DeadLetterQueueFactory.getWriter(pipeline_id, settings.get_value("path.dead_letter_queue"))
else
@dlq_writer = LogStash::Util::DummyDeadLetterQueueWriter.new
end

grammar = LogStashConfigParser.new
parsed_config = grammar.parse(config_str)
Expand Down Expand Up @@ -100,16 +110,18 @@ def plugin(plugin_type, name, *args)

klass = Plugin.lookup(plugin_type, name)

execution_context = ExecutionContext.new(self, @agent, id, klass.config_name, @dlq_writer)

if plugin_type == "output"
OutputDelegator.new(@logger, klass, type_scoped_metric, @execution_context, OutputDelegatorStrategyRegistry.instance, args)
OutputDelegator.new(@logger, klass, type_scoped_metric, execution_context, OutputDelegatorStrategyRegistry.instance, args)
elsif plugin_type == "filter"
FilterDelegator.new(@logger, klass, type_scoped_metric, @execution_context, args)
FilterDelegator.new(@logger, klass, type_scoped_metric, execution_context, args)
else # input
input_plugin = klass.new(args)
scoped_metric = type_scoped_metric.namespace(id.to_sym)
scoped_metric.gauge(:name, input_plugin.config_name)
input_plugin.metric = scoped_metric
input_plugin.execution_context = @execution_context
input_plugin.execution_context = execution_context
input_plugin
end
end
Expand Down Expand Up @@ -180,6 +192,7 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil, agent =
)
@drain_queue = @settings.get_value("queue.drain")


@events_filtered = Concurrent::AtomicFixnum.new(0)
@events_consumed = Concurrent::AtomicFixnum.new(0)

Expand All @@ -191,8 +204,6 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil, agent =
@force_shutdown = Concurrent::AtomicBoolean.new(false)
end # def initialize



def ready?
@ready.value
end
Expand Down Expand Up @@ -305,6 +316,7 @@ def run
def close
@filter_queue_client.close
@queue.close
@dlq_writer.close
end

def transition_to_running
Expand Down
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "logstash/logging"
require "logstash/config/mixin"
require "logstash/instrument/null_metric"
require "logstash/util/dead_letter_queue_manager"
require "concurrent"
require "securerandom"

Expand Down
61 changes: 61 additions & 0 deletions logstash-core/lib/logstash/util/dead_letter_queue_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
require 'logstash/environment'

module LogStash; module Util
class PluginDeadLetterQueueWriter

attr_reader :plugin_id, :plugin_type, :inner_writer

def initialize(inner_writer, plugin_id, plugin_type)
@plugin_id = plugin_id
@plugin_type = plugin_type
@inner_writer = inner_writer
end

def write(logstash_event, reason)
if @inner_writer && @inner_writer.is_open
@inner_writer.writeEntry(logstash_event.to_java, @plugin_type, @plugin_id, reason)
end
end

def close
if @inner_writer && @inner_writer.is_open
@inner_writer.close
end
end
end

class DummyDeadLetterQueueWriter
# class uses to represent a writer when dead_letter_queue is disabled
def initialize
end

def write(logstash_event, reason)
# noop
end

def is_open
false
end

def close
# noop
end
end

class DeadLetterQueueFactory
java_import org.logstash.common.DeadLetterQueueFactory

def self.get(pipeline_id)
if LogStash::SETTINGS.get("dead_letter_queue.enable")
return DeadLetterQueueWriter.new(
DeadLetterQueueFactory.getWriter(pipeline_id, LogStash::SETTINGS.get("path.dead_letter_queue")))
else
return DeadLetterQueueWriter.new(nil)
end
end

def self.close(pipeline_id)
DeadLetterQueueFactory.close(pipeline_id)
end
end
end end
12 changes: 0 additions & 12 deletions logstash-core/spec/logstash/agent/converge_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,6 @@
expect(converge_result).to be_a_successful_converge
end


describe "passing the agent to the pipeline" do
let(:source_loader) { TestSourceLoader.new(pipeline_config) }
let(:pipeline_config) { mock_pipeline_config(:main, "input { generator { count => 10 } } output { null {} }") }

before { subject.execute }

it "execute the pipeline and stop execution" do
expect(subject.get_pipeline(:main).execution_context.agent).to eq(subject)
end
end

context "Agent execute options" do
let(:source_loader) do
TestSourceLoader.new(finite_pipeline_config)
Expand Down
18 changes: 14 additions & 4 deletions logstash-core/spec/logstash/execution_context_spec.rb
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
# encoding: utf-8
require "spec_helper"
require "logstash/util/dead_letter_queue_manager"
require "logstash/execution_context"

describe LogStash::ExecutionContext do
let(:pipeline) { double("pipeline") }
let(:pipeline_id) { :main }
let(:agent) { double("agent") }

let(:plugin_id) { "plugin_id" }
let(:plugin_type) { "plugin_type" }
let(:dlq_writer) { LogStash::Util::DummyDeadLetterQueueWriter.new }

before do
allow(pipeline).to receive(:agent).and_return(agent)
allow(pipeline).to receive(:pipeline_id).and_return(pipeline_id)
end

subject { described_class.new(pipeline, agent) }
subject { described_class.new(pipeline, agent, plugin_id, plugin_type, dlq_writer) }

it "returns the `pipeline_id`" do
expect(subject.pipeline_id).to eq(pipeline_id)
end

it "returns the pipeline" do
expect(subject.pipeline).to eq(pipeline)
end

it "returns the agent" do
expect(subject.agent).to eq(agent)
end

it "returns the plugin-specific dlq writer" do
expect(subject.dlq_writer.plugin_type).to eq(plugin_type)
expect(subject.dlq_writer.plugin_id).to eq(plugin_id)
expect(subject.dlq_writer.inner_writer).to eq(dlq_writer)
end
end
2 changes: 1 addition & 1 deletion logstash-core/spec/logstash/inputs/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def register; end

context "execution context" do
include_context "execution_context"

let(:klass) { LogStash::Inputs::NOOP }

subject(:instance) { klass.new({}) }
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/spec/logstash/output_delegator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
let(:plugin_args) { {"id" => "foo", "arg1" => "val1"} }
let(:collector) { [] }
let(:metric) { LogStash::Instrument::NamespacedNullMetric.new(collector, :null) }

include_context "execution_context"

subject { described_class.new(logger, out_klass, metric, execution_context, ::LogStash::OutputDelegatorStrategyRegistry.instance, plugin_args) }
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/spec/logstash/outputs/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def multi_receive_encoded(events_and_encoded)

context "execution context" do
include_context "execution_context"

let(:klass) { LogStash::Outputs::NOOPSingle }

subject(:instance) { klass.new(params.dup) }
Expand Down
Loading

0 comments on commit 458910b

Please sign in to comment.