Skip to content

Commit

Permalink
support multiple pipelines in one logstash instance
Browse files Browse the repository at this point in the history
* add multi_local source for multi pipelines
* introduce pipelines.yml
* introduce PipelineSettings class
* support reloading of pipeline parameters
* fix pipeline api call for _node/pipelines
* inform user pipelines.yml is ignored if -e or -f is enabled
  • Loading branch information
jsvd committed May 30, 2017
1 parent b1d8a4b commit bed8b8a
Show file tree
Hide file tree
Showing 36 changed files with 578 additions and 144 deletions.
69 changes: 69 additions & 0 deletions config/pipelines.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# List of pipelines to be loaded by Logstash
#
# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings.
# Default values for ommitted settings are read from the `logstash.yml` file.
# When declaring multiple pipelines, each MUST have its own `pipeline.id`.
#
# Example of two pipelines:
#
# - pipeline.id: test
# pipeline.workers: 1
# pipeline.batch.size: 1
# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }
# - pipeline.id: another_test
# queue.type: persisted
# path.config: "/tmp/logstash/*.config"
#
# Available options:
#
# # name of the pipeline
# pipeline.id: mylogs
#
# # The configuration string to be used by this pipeline
# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }"
#
# # The path from where to read the configuration text
# path.config: "/etc/conf.d/logstash/myconfig.cfg"
#
# # How many worker threads execute the Filters+Outputs stage of the pipeline
# pipeline.workers: 1 (actually defaults to number of CPUs)
#
# # How many events to retrieve from inputs before sending to filters+workers
# pipeline.batch.size: 125
#
# # How long to wait before dispatching an undersized batch to filters+workers
# pipeline.batch.delay: 5
#
# # How many workers should be used per output plugin instance
# pipeline.output.workers: 1
#
# # Internal queuing model, "memory" for legacy in-memory based queuing and
# # "persisted" for disk-based acked queueing. Defaults is memory
# queue.type: memory
#
# # If using queue.type: persisted, the page data files size. The queue data consists of
# # append-only data files separated into pages. Default is 250mb
# queue.page_capacity: 250mb
#
# # If using queue.type: persisted, the maximum number of unread events in the queue.
# # Default is 0 (unlimited)
# queue.max_events: 0
#
# # If using queue.type: persisted, the total capacity of the queue in number of bytes.
# # Default is 1024mb or 1gb
# queue.max_bytes: 1024mb
#
# # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# # Default is 1024, 0 for unlimited
# queue.checkpoint.acks: 1024
#
# # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# # Default is 1024, 0 for unlimited
# queue.checkpoint.writes: 1024
#
# # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# # Default is 1000, 0 for no periodic checkpoint.
# queue.checkpoint.interval: 1000
#
# # Enable Dead Letter Queueing for this pipeline.
# dead_letter_queue.enable: false
28 changes: 28 additions & 0 deletions docs/static/multiple-pipelines.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
[[multiple-pipelines]]
=== Multiple Pipelines

If you need to run more than one pipeline in the same process, Logstash provides a way to do it through a configuration file called `pipelines.yml`.
This file must be placed in the `path.settings` folder and follows this structure:

```yaml
- pipeline.id: my-pipeline_1
path.config: "/etc/path/to/p1.config"
pipeline.workers: 3
- pipeline.id: my-other-pipeline
path.config: "/etc/different/path/p2.cfg"
queue.type: persisted
```

This file is formatted in YAML and contains a list of dictionaries where each dictionary describes a pipeline and each key/value pair a setting for that pipeline. In the example above, we describe two pipelines by stating their ids and their configuration paths. Also, for the first pipeline we set the value of `pipeline.workers` to 3, while in the other we enable Persistent Queue.
The value of a setting that is not explictly set in this file will fall back to the defaults described in the `logstash.yml` file.

Starting Logstash without arguments will make it read the `pipelines.yml` file and instantiate the multiple pipelines. On the other hand, using -e or -f will make Logstash ignore the `pipelines.yml` file and log a warning about it.

[[multiple-pipeline-usage]]
==== Usage Considerations

Using multiple pipelines is specially useful if your current configuration has event flows that don't share the same inputs/filters and outputs and are being separated from each other using tags and conditionals.

Having multiple pipelines in a single instances also allows these event flows to have different performance and durability parameters (e.g. pipeline.workers and persistent queues). this separation means that a blocked output in one pipeline won't exert backpressure in the other.

That said, it's important to take into account resource competition between the pipelines, given that the default values are tuned for a single pipeline. So, for example, consider reducing the number of pipeline workers used by each pipeline, as by default each will use 1 worker per CPU core.
18 changes: 13 additions & 5 deletions logstash-core/lib/logstash/api/commands/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,28 @@ class Node < Commands::Base

def all(selected_fields=[])
payload = {
:pipeline => pipeline,
:pipelines => pipelines,
:os => os,
:jvm => jvm
}
payload.select! { |k,v| selected_fields.include?(k) } unless selected_fields.empty?
payload
end

def pipeline(pipeline_id = LogStash::SETTINGS.get("pipeline.id").to_sym)
stats = extract_metrics(
[:stats, :pipelines, pipeline_id, :config],
def pipelines
pipeline_ids = service.get_shallow(:stats, :pipelines).keys
pipeline_ids.each_with_object({}) do |pipeline_id, result|
result[pipeline_id] = pipeline(pipeline_id)
end
end

def pipeline(pipeline_id)
extract_metrics(
[:stats, :pipelines, pipeline_id.to_sym, :config],
:workers, :batch_size, :batch_delay, :config_reload_automatic, :config_reload_interval
)
stats.merge(:id => pipeline_id)
rescue
{}
end

def os
Expand Down
23 changes: 18 additions & 5 deletions logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,21 @@ def process
def events
extract_metrics(
[:stats, :events],
:in, :filtered, :out, :duration_in_millis
:in, :filtered, :out, :duration_in_millis, :queue_push_duration_in_millis
)
end

def pipeline(pipeline_id = LogStash::SETTINGS.get("pipeline.id").to_sym)
stats = service.get_shallow(:stats, :pipelines, pipeline_id)
stats = PluginsStats.report(stats)
stats.merge(:id => pipeline_id)
def pipeline(pipeline_id = nil)
if pipeline_id.nil?
pipeline_ids = service.get_shallow(:stats, :pipelines).keys
pipeline_ids.each_with_object({}) do |pipeline_id, result|
result[pipeline_id] = plugins_stats_report(pipeline_id)
end
else
{ pipeline_id => plugins_stats_report(pipeline_id) }
end
rescue # failed to find pipeline
{}
end

def memory
Expand Down Expand Up @@ -85,6 +92,12 @@ def hot_threads(options={})
HotThreadsReport.new(self, options)
end

private
def plugins_stats_report(pipeline_id)
stats = service.get_shallow(:stats, :pipelines, pipeline_id.to_sym)
PluginsStats.report(stats)
end

module PluginsStats
module_function

Expand Down
7 changes: 7 additions & 0 deletions logstash-core/lib/logstash/api/modules/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ def node
respond_with(node.hot_threads(options), {:as => as})
end

get "/pipelines/:id" do
pipeline_id = params["id"]
payload = node.pipeline(pipeline_id)
halt(404) if payload.empty?
respond_with(:pipelines => { pipeline_id => payload } )
end

get "/?:filter?" do
selected_fields = extract_fields(params["filter"].to_s.strip)
values = node.all(selected_fields)
Expand Down
17 changes: 12 additions & 5 deletions logstash-core/lib/logstash/api/modules/node_stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ class NodeStats < ::LogStash::Api::Modules::Base
@stats = factory.build(:stats)
end

get "/pipelines/:id?" do
payload = pipeline_payload(params["id"])
halt(404) if payload.empty?
respond_with(:pipelines => payload)
end

get "/?:filter?" do
payload = {
:jvm => jvm_payload,
:process => process_payload,
:pipeline => pipeline_payload,
:reloads => reloads,
:events => events_payload,
:pipelines => pipeline_payload,
:reloads => reloads_payload,
:os => os_payload
}
respond_with(payload, {:filter => params["filter"]})
Expand All @@ -32,7 +39,7 @@ def jvm_payload
@stats.jvm
end

def reloads
def reloads_payload
@stats.reloads
end

Expand All @@ -44,8 +51,8 @@ def mem_payload
@stats.memory
end

def pipeline_payload
@stats.pipeline
def pipeline_payload(val = nil)
@stats.pipeline(val)
end
end
end
Expand Down
14 changes: 3 additions & 11 deletions logstash-core/lib/logstash/bootstrap_check/default_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,9 @@
module LogStash module BootstrapCheck
class DefaultConfig
def self.check(settings)
if settings.get("config.string").nil? && settings.get("path.config").nil?
raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.missing-configuration")
end

if settings.get("config.string") && settings.get("path.config")
raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.config-string-path-exclusive")
end

if settings.get("config.reload.automatic") && settings.get("path.config").nil?
# there's nothing to reload
raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.reload-without-config-path")
# currently none of the checks applies if there are multiple pipelines
if settings.get("config.reload.automatic") && settings.get_setting("config.string").set?
raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.reload-with-config-string")
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/config/pipeline_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def system?
end

def ==(other)
config_hash == other.config_hash && pipeline_id == other.pipeline_id
config_hash == other.config_hash && pipeline_id == other.pipeline_id && settings == other.settings
end

def display_debug_information
Expand Down
14 changes: 6 additions & 8 deletions logstash-core/lib/logstash/config/source/local.rb
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ def path

def get_unmatched_files
# transform "/var/lib/*.conf" => /var/lib/*
t = File.split(@path)
all_files = Dir.glob(File.join(t.first, "*")).sort
t = ::File.split(@path)
all_files = Dir.glob(::File.join(t.first, "*")).sort
all_files - get_matched_files
end

Expand Down Expand Up @@ -144,8 +144,10 @@ def self.read(uri)

def pipeline_configs

unless mutually_exclusive(config_string?, local_config?, remote_config?)
if config_path? && config_string?
raise ConfigurationError.new("Settings 'config.string' and 'path.config' can't be used simultaneously.")
elsif !config_path? && !config_string?
raise ConfigurationError.new("Either 'config.string' or 'path.config' must be set.")
end

config_parts = if config_string?
Expand All @@ -162,7 +164,7 @@ def pipeline_configs

add_missing_default_inputs_or_outputs(config_parts) if config_string?

[PipelineConfig.new(self.class, PIPELINE_ID, config_parts, @settings)]
[PipelineConfig.new(self.class, @settings.get("pipeline.id").to_sym, config_parts, @settings)]
end

def match?
Expand Down Expand Up @@ -224,9 +226,5 @@ def remote_config?
false
end
end

def mutually_exclusive(a, b, c)
(a ^ b ^ c) && !(a && b && c)
end
end
end end end
72 changes: 72 additions & 0 deletions logstash-core/lib/logstash/config/source/multi_local.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# encoding: utf-8
require "logstash/config/source/local"
require "logstash/util/loggable"
require "logstash/pipeline_settings"

module LogStash module Config module Source
class MultiLocal < Local
include LogStash::Util::Loggable

def initialize(settings)
@original_settings = settings
super(settings)
end

def pipeline_configs
pipelines = retrieve_yaml_pipelines()
pipelines_settings = pipelines.map do |pipeline_settings|
::LogStash::PipelineSettings.from_settings(@original_settings.clone).merge(pipeline_settings)
end
detect_duplicate_pipelines(pipelines_settings)
pipelines_settings.map do |pipeline_settings|
@settings = pipeline_settings
# this relies on instance variable @settings and the parent class' pipeline_configs
# method. The alternative is to refactor most of the Local source methods to accept
# a settings object instead of relying on @settings.
super # create a PipelineConfig object based on @settings
end.flatten
end

def match?
uses_config_string = @original_settings.get_setting("config.string").set?
uses_path_config = @original_settings.get_setting("path.config").set?
return true if !uses_config_string && !uses_path_config
if uses_path_config
logger.warn("Ignoring the 'pipelines.yml' file because 'path.config' (-f) is being used.")
elsif uses_config_string
logger.warn("Ignoring the 'pipelines.yml' file because 'config.string' (-e) is being used.")
end
false
end

def retrieve_yaml_pipelines
result = read_pipelines_from_yaml(pipelines_yaml_location)
case result
when Array
result
when false
raise ConfigurationError.new("Pipelines YAML file is empty. Path: #{pipelines_yaml_location}")
else
raise ConfigurationError.new("Pipelines YAML file must contain an array of pipeline configs. Found \"#{result.class}\" in #{pipelines_yaml_location}")
end
end

def read_pipelines_from_yaml(yaml_location)
logger.debug("Reading pipeline configurations from YAML", :location => pipelines_yaml_location)
::YAML.load(IO.read(yaml_location))
rescue => e
raise ConfigurationError.new("Failed to read pipelines yaml file. Location: #{yaml_location}, Exception: #{e.inspect}")
end

def pipelines_yaml_location
::File.join(@original_settings.get("path.settings"), "pipelines.yml")
end

def detect_duplicate_pipelines(pipelines)
duplicate_ids = pipelines.group_by {|pipeline| pipeline.get("pipeline.id") }.select {|k, v| v.size > 1 }.map {|k, v| k}
if duplicate_ids.any?
raise ConfigurationError.new("Pipelines YAML file contains duplicate pipeline ids: #{duplicate_ids.inspect}. Location: #{pipelines_yaml_location}")
end
end
end
end end end
1 change: 1 addition & 0 deletions logstash-core/lib/logstash/config/source_loader.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# encoding: utf-8
require "logstash/config/source/local"
require "logstash/config/source/multi_local"
require "logstash/errors"
require "thread"
require "set"
Expand Down
Loading

0 comments on commit bed8b8a

Please sign in to comment.