diff --git a/config/pipelines.yml b/config/pipelines.yml new file mode 100644 index 00000000000..83286f2fa3a --- /dev/null +++ b/config/pipelines.yml @@ -0,0 +1,69 @@ +# List of pipelines to be loaded by Logstash +# +# This document must be a list of dictionaries/hashes, where the keys/values are pipeline settings. +# Default values for ommitted settings are read from the `logstash.yml` file. +# When declaring multiple pipelines, each MUST have its own `pipeline.id`. +# +# Example of two pipelines: +# +# - pipeline.id: test +# pipeline.workers: 1 +# pipeline.batch.size: 1 +# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } } +# - pipeline.id: another_test +# queue.type: persisted +# path.config: "/tmp/logstash/*.config" +# +# Available options: +# +# # name of the pipeline +# pipeline.id: mylogs +# +# # The configuration string to be used by this pipeline +# config.string: "input { generator {} } filter { sleep { time => 1 } } output { stdout { codec => dots } }" +# +# # The path from where to read the configuration text +# path.config: "/etc/conf.d/logstash/myconfig.cfg" +# +# # How many worker threads execute the Filters+Outputs stage of the pipeline +# pipeline.workers: 1 (actually defaults to number of CPUs) +# +# # How many events to retrieve from inputs before sending to filters+workers +# pipeline.batch.size: 125 +# +# # How long to wait before dispatching an undersized batch to filters+workers +# pipeline.batch.delay: 5 +# +# # How many workers should be used per output plugin instance +# pipeline.output.workers: 1 +# +# # Internal queuing model, "memory" for legacy in-memory based queuing and +# # "persisted" for disk-based acked queueing. Defaults is memory +# queue.type: memory +# +# # If using queue.type: persisted, the page data files size. The queue data consists of +# # append-only data files separated into pages. Default is 250mb +# queue.page_capacity: 250mb +# +# # If using queue.type: persisted, the maximum number of unread events in the queue. +# # Default is 0 (unlimited) +# queue.max_events: 0 +# +# # If using queue.type: persisted, the total capacity of the queue in number of bytes. +# # Default is 1024mb or 1gb +# queue.max_bytes: 1024mb +# +# # If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint +# # Default is 1024, 0 for unlimited +# queue.checkpoint.acks: 1024 +# +# # If using queue.type: persisted, the maximum number of written events before forcing a checkpoint +# # Default is 1024, 0 for unlimited +# queue.checkpoint.writes: 1024 +# +# # If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page +# # Default is 1000, 0 for no periodic checkpoint. +# queue.checkpoint.interval: 1000 +# +# # Enable Dead Letter Queueing for this pipeline. +# dead_letter_queue.enable: false diff --git a/docs/static/multiple-pipelines.asciidoc b/docs/static/multiple-pipelines.asciidoc new file mode 100644 index 00000000000..804a7fc3e64 --- /dev/null +++ b/docs/static/multiple-pipelines.asciidoc @@ -0,0 +1,28 @@ +[[multiple-pipelines]] +=== Multiple Pipelines + +If you need to run more than one pipeline in the same process, Logstash provides a way to do it through a configuration file called `pipelines.yml`. +This file must be placed in the `path.settings` folder and follows this structure: + +```yaml +- pipeline.id: my-pipeline_1 + path.config: "/etc/path/to/p1.config" + pipeline.workers: 3 +- pipeline.id: my-other-pipeline + path.config: "/etc/different/path/p2.cfg" + queue.type: persisted +``` + +This file is formatted in YAML and contains a list of dictionaries where each dictionary describes a pipeline and each key/value pair a setting for that pipeline. In the example above, we describe two pipelines by stating their ids and their configuration paths. Also, for the first pipeline we set the value of `pipeline.workers` to 3, while in the other we enable Persistent Queue. +The value of a setting that is not explictly set in this file will fall back to the defaults described in the `logstash.yml` file. + +Starting Logstash without arguments will make it read the `pipelines.yml` file and instantiate the multiple pipelines. On the other hand, using -e or -f will make Logstash ignore the `pipelines.yml` file and log a warning about it. + +[[multiple-pipeline-usage]] +==== Usage Considerations + +Using multiple pipelines is specially useful if your current configuration has event flows that don't share the same inputs/filters and outputs and are being separated from each other using tags and conditionals. + +Having multiple pipelines in a single instances also allows these event flows to have different performance and durability parameters (e.g. pipeline.workers and persistent queues). this separation means that a blocked output in one pipeline won't exert backpressure in the other. + +That said, it's important to take into account resource competition between the pipelines, given that the default values are tuned for a single pipeline. So, for example, consider reducing the number of pipeline workers used by each pipeline, as by default each will use 1 worker per CPU core. diff --git a/logstash-core/lib/logstash/api/commands/node.rb b/logstash-core/lib/logstash/api/commands/node.rb index e52e6c94fb5..16cbc2de985 100644 --- a/logstash-core/lib/logstash/api/commands/node.rb +++ b/logstash-core/lib/logstash/api/commands/node.rb @@ -9,7 +9,7 @@ class Node < Commands::Base def all(selected_fields=[]) payload = { - :pipeline => pipeline, + :pipelines => pipelines, :os => os, :jvm => jvm } @@ -17,12 +17,20 @@ def all(selected_fields=[]) payload end - def pipeline(pipeline_id = LogStash::SETTINGS.get("pipeline.id").to_sym) - stats = extract_metrics( - [:stats, :pipelines, pipeline_id, :config], + def pipelines + pipeline_ids = service.get_shallow(:stats, :pipelines).keys + pipeline_ids.each_with_object({}) do |pipeline_id, result| + result[pipeline_id] = pipeline(pipeline_id) + end + end + + def pipeline(pipeline_id) + extract_metrics( + [:stats, :pipelines, pipeline_id.to_sym, :config], :workers, :batch_size, :batch_delay, :config_reload_automatic, :config_reload_interval ) - stats.merge(:id => pipeline_id) + rescue + {} end def os diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index c9a59f878c2..ba04c2cbe79 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -41,14 +41,21 @@ def process def events extract_metrics( [:stats, :events], - :in, :filtered, :out, :duration_in_millis + :in, :filtered, :out, :duration_in_millis, :queue_push_duration_in_millis ) end - def pipeline(pipeline_id = LogStash::SETTINGS.get("pipeline.id").to_sym) - stats = service.get_shallow(:stats, :pipelines, pipeline_id) - stats = PluginsStats.report(stats) - stats.merge(:id => pipeline_id) + def pipeline(pipeline_id = nil) + if pipeline_id.nil? + pipeline_ids = service.get_shallow(:stats, :pipelines).keys + pipeline_ids.each_with_object({}) do |pipeline_id, result| + result[pipeline_id] = plugins_stats_report(pipeline_id) + end + else + { pipeline_id => plugins_stats_report(pipeline_id) } + end + rescue # failed to find pipeline + {} end def memory @@ -85,6 +92,12 @@ def hot_threads(options={}) HotThreadsReport.new(self, options) end + private + def plugins_stats_report(pipeline_id) + stats = service.get_shallow(:stats, :pipelines, pipeline_id.to_sym) + PluginsStats.report(stats) + end + module PluginsStats module_function diff --git a/logstash-core/lib/logstash/api/modules/node.rb b/logstash-core/lib/logstash/api/modules/node.rb index 32bf09149fa..6693a5ccc28 100644 --- a/logstash-core/lib/logstash/api/modules/node.rb +++ b/logstash-core/lib/logstash/api/modules/node.rb @@ -20,6 +20,13 @@ def node respond_with(node.hot_threads(options), {:as => as}) end + get "/pipelines/:id" do + pipeline_id = params["id"] + payload = node.pipeline(pipeline_id) + halt(404) if payload.empty? + respond_with(:pipelines => { pipeline_id => payload } ) + end + get "/?:filter?" do selected_fields = extract_fields(params["filter"].to_s.strip) values = node.all(selected_fields) diff --git a/logstash-core/lib/logstash/api/modules/node_stats.rb b/logstash-core/lib/logstash/api/modules/node_stats.rb index f56efe81c59..2a58fc7aecf 100644 --- a/logstash-core/lib/logstash/api/modules/node_stats.rb +++ b/logstash-core/lib/logstash/api/modules/node_stats.rb @@ -8,12 +8,19 @@ class NodeStats < ::LogStash::Api::Modules::Base @stats = factory.build(:stats) end + get "/pipelines/:id?" do + payload = pipeline_payload(params["id"]) + halt(404) if payload.empty? + respond_with(:pipelines => payload) + end + get "/?:filter?" do payload = { :jvm => jvm_payload, :process => process_payload, - :pipeline => pipeline_payload, - :reloads => reloads, + :events => events_payload, + :pipelines => pipeline_payload, + :reloads => reloads_payload, :os => os_payload } respond_with(payload, {:filter => params["filter"]}) @@ -32,7 +39,7 @@ def jvm_payload @stats.jvm end - def reloads + def reloads_payload @stats.reloads end @@ -44,8 +51,8 @@ def mem_payload @stats.memory end - def pipeline_payload - @stats.pipeline + def pipeline_payload(val = nil) + @stats.pipeline(val) end end end diff --git a/logstash-core/lib/logstash/bootstrap_check/default_config.rb b/logstash-core/lib/logstash/bootstrap_check/default_config.rb index 8331c861fc7..bf7da0a2b56 100644 --- a/logstash-core/lib/logstash/bootstrap_check/default_config.rb +++ b/logstash-core/lib/logstash/bootstrap_check/default_config.rb @@ -4,17 +4,9 @@ module LogStash module BootstrapCheck class DefaultConfig def self.check(settings) - if settings.get("config.string").nil? && settings.get("path.config").nil? - raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.missing-configuration") - end - - if settings.get("config.string") && settings.get("path.config") - raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.config-string-path-exclusive") - end - - if settings.get("config.reload.automatic") && settings.get("path.config").nil? - # there's nothing to reload - raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.reload-without-config-path") + # currently none of the checks applies if there are multiple pipelines + if settings.get("config.reload.automatic") && settings.get_setting("config.string").set? + raise LogStash::BootstrapCheckError, I18n.t("logstash.runner.reload-with-config-string") end end end diff --git a/logstash-core/lib/logstash/config/pipeline_config.rb b/logstash-core/lib/logstash/config/pipeline_config.rb index afc8c1e1021..d93b444b4e5 100644 --- a/logstash-core/lib/logstash/config/pipeline_config.rb +++ b/logstash-core/lib/logstash/config/pipeline_config.rb @@ -30,7 +30,7 @@ def system? end def ==(other) - config_hash == other.config_hash && pipeline_id == other.pipeline_id + config_hash == other.config_hash && pipeline_id == other.pipeline_id && settings == other.settings end def display_debug_information diff --git a/logstash-core/lib/logstash/config/source/local.rb b/logstash-core/lib/logstash/config/source/local.rb index c3477d9f09a..7c454d8c98c 100644 --- a/logstash-core/lib/logstash/config/source/local.rb +++ b/logstash-core/lib/logstash/config/source/local.rb @@ -96,8 +96,8 @@ def path def get_unmatched_files # transform "/var/lib/*.conf" => /var/lib/* - t = File.split(@path) - all_files = Dir.glob(File.join(t.first, "*")).sort + t = ::File.split(@path) + all_files = Dir.glob(::File.join(t.first, "*")).sort all_files - get_matched_files end @@ -144,8 +144,10 @@ def self.read(uri) def pipeline_configs - unless mutually_exclusive(config_string?, local_config?, remote_config?) + if config_path? && config_string? raise ConfigurationError.new("Settings 'config.string' and 'path.config' can't be used simultaneously.") + elsif !config_path? && !config_string? + raise ConfigurationError.new("Either 'config.string' or 'path.config' must be set.") end config_parts = if config_string? @@ -162,7 +164,7 @@ def pipeline_configs add_missing_default_inputs_or_outputs(config_parts) if config_string? - [PipelineConfig.new(self.class, PIPELINE_ID, config_parts, @settings)] + [PipelineConfig.new(self.class, @settings.get("pipeline.id").to_sym, config_parts, @settings)] end def match? @@ -224,9 +226,5 @@ def remote_config? false end end - - def mutually_exclusive(a, b, c) - (a ^ b ^ c) && !(a && b && c) - end end end end end diff --git a/logstash-core/lib/logstash/config/source/multi_local.rb b/logstash-core/lib/logstash/config/source/multi_local.rb new file mode 100644 index 00000000000..b16e3a19acc --- /dev/null +++ b/logstash-core/lib/logstash/config/source/multi_local.rb @@ -0,0 +1,72 @@ +# encoding: utf-8 +require "logstash/config/source/local" +require "logstash/util/loggable" +require "logstash/pipeline_settings" + +module LogStash module Config module Source + class MultiLocal < Local + include LogStash::Util::Loggable + + def initialize(settings) + @original_settings = settings + super(settings) + end + + def pipeline_configs + pipelines = retrieve_yaml_pipelines() + pipelines_settings = pipelines.map do |pipeline_settings| + ::LogStash::PipelineSettings.from_settings(@original_settings.clone).merge(pipeline_settings) + end + detect_duplicate_pipelines(pipelines_settings) + pipelines_settings.map do |pipeline_settings| + @settings = pipeline_settings + # this relies on instance variable @settings and the parent class' pipeline_configs + # method. The alternative is to refactor most of the Local source methods to accept + # a settings object instead of relying on @settings. + super # create a PipelineConfig object based on @settings + end.flatten + end + + def match? + uses_config_string = @original_settings.get_setting("config.string").set? + uses_path_config = @original_settings.get_setting("path.config").set? + return true if !uses_config_string && !uses_path_config + if uses_path_config + logger.warn("Ignoring the 'pipelines.yml' file because 'path.config' (-f) is being used.") + elsif uses_config_string + logger.warn("Ignoring the 'pipelines.yml' file because 'config.string' (-e) is being used.") + end + false + end + + def retrieve_yaml_pipelines + result = read_pipelines_from_yaml(pipelines_yaml_location) + case result + when Array + result + when false + raise ConfigurationError.new("Pipelines YAML file is empty. Path: #{pipelines_yaml_location}") + else + raise ConfigurationError.new("Pipelines YAML file must contain an array of pipeline configs. Found \"#{result.class}\" in #{pipelines_yaml_location}") + end + end + + def read_pipelines_from_yaml(yaml_location) + logger.debug("Reading pipeline configurations from YAML", :location => pipelines_yaml_location) + ::YAML.load(IO.read(yaml_location)) + rescue => e + raise ConfigurationError.new("Failed to read pipelines yaml file. Location: #{yaml_location}, Exception: #{e.inspect}") + end + + def pipelines_yaml_location + ::File.join(@original_settings.get("path.settings"), "pipelines.yml") + end + + def detect_duplicate_pipelines(pipelines) + duplicate_ids = pipelines.group_by {|pipeline| pipeline.get("pipeline.id") }.select {|k, v| v.size > 1 }.map {|k, v| k} + if duplicate_ids.any? + raise ConfigurationError.new("Pipelines YAML file contains duplicate pipeline ids: #{duplicate_ids.inspect}. Location: #{pipelines_yaml_location}") + end + end + end +end end end diff --git a/logstash-core/lib/logstash/config/source_loader.rb b/logstash-core/lib/logstash/config/source_loader.rb index 84983c2bd52..7e394cb25e6 100644 --- a/logstash-core/lib/logstash/config/source_loader.rb +++ b/logstash-core/lib/logstash/config/source_loader.rb @@ -1,5 +1,6 @@ # encoding: utf-8 require "logstash/config/source/local" +require "logstash/config/source/multi_local" require "logstash/errors" require "thread" require "set" diff --git a/logstash-core/lib/logstash/pipeline.rb b/logstash-core/lib/logstash/pipeline.rb index 94f47c5c554..929b2383652 100644 --- a/logstash-core/lib/logstash/pipeline.rb +++ b/logstash-core/lib/logstash/pipeline.rb @@ -32,12 +32,14 @@ module LogStash; class BasePipeline include LogStash::Util::Loggable attr_reader :settings, :config_str, :config_hash, :inputs, :filters, :outputs, :pipeline_id, :lir, :execution_context + attr_reader :pipeline_config - def initialize(config_str, settings = SETTINGS, namespaced_metric = nil, agent = nil) + def initialize(pipeline_config, namespaced_metric = nil, agent = nil) @logger = self.logger - @config_str = config_str - @settings = settings + @pipeline_config = pipeline_config + @config_str = pipeline_config.config_string + @settings = pipeline_config.settings @config_hash = Digest::SHA1.hexdigest(@config_str) @lir = compile_lir @@ -46,7 +48,7 @@ def initialize(config_str, settings = SETTINGS, namespaced_metric = nil, agent = # a unique id when auto-generating plugin ids @plugin_counter ||= 0 - @pipeline_id = settings.get_value("pipeline.id") || self.object_id + @pipeline_id = @settings.get_value("pipeline.id") || self.object_id # A list of plugins indexed by id @plugins_by_id = {} @@ -160,7 +162,8 @@ module LogStash; class Pipeline < BasePipeline MAX_INFLIGHT_WARN_THRESHOLD = 10_000 - def initialize(config_str, settings = SETTINGS, namespaced_metric = nil, agent = nil) + def initialize(pipeline_config, namespaced_metric = nil, agent = nil) + @settings = pipeline_config.settings # This needs to be configured before we call super which will evaluate the code to make # sure the metric instance is correctly send to the plugins to make the namespace scoping work @metric = if namespaced_metric @@ -242,7 +245,7 @@ def start # this is useful in the context of pipeline reloading collect_stats - logger.debug("Starting pipeline", default_logging_keys) + @logger.debug("Starting pipeline", default_logging_keys) @finished_execution = Concurrent::AtomicBoolean.new(false) @@ -291,7 +294,7 @@ def run start_workers - @logger.info("Pipeline started", default_logging_keys) + @logger.info("Pipeline started", "pipeline.id" => @pipeline_id) # Block until all inputs have stopped # Generally this happens if SIGINT is sent and `shutdown` is called from an external thread @@ -572,6 +575,7 @@ def shutdown(&before_stop) # stopped wait_for_workers clear_pipeline_metrics + @logger.info("Pipeline terminated", "pipeline.id" => @pipeline_id) end # def shutdown def force_shutdown! diff --git a/logstash-core/lib/logstash/pipeline_action/create.rb b/logstash-core/lib/logstash/pipeline_action/create.rb index 1e5628d0c54..fe0937831e5 100644 --- a/logstash-core/lib/logstash/pipeline_action/create.rb +++ b/logstash-core/lib/logstash/pipeline_action/create.rb @@ -32,7 +32,7 @@ def execution_priority # The execute assume that the thread safety access of the pipeline # is managed by the caller. def execute(agent, pipelines) - pipeline = LogStash::Pipeline.new(@pipeline_config.config_string, @pipeline_config.settings, @metric, agent) + pipeline = LogStash::Pipeline.new(@pipeline_config, @metric, agent) status = pipeline.start # block until the pipeline is correctly started or crashed @@ -42,6 +42,5 @@ def execute(agent, pipelines) LogStash::ConvergeResult::ActionResult.create(self, status) end - end end end diff --git a/logstash-core/lib/logstash/pipeline_action/reload.rb b/logstash-core/lib/logstash/pipeline_action/reload.rb index 1e53533d6e9..e0d8f7fcc97 100644 --- a/logstash-core/lib/logstash/pipeline_action/reload.rb +++ b/logstash-core/lib/logstash/pipeline_action/reload.rb @@ -27,7 +27,7 @@ def execute(agent, pipelines) end begin - pipeline_validator = LogStash::BasePipeline.new(@pipeline_config.config_string, @pipeline_config.settings) + pipeline_validator = LogStash::BasePipeline.new(@pipeline_config) rescue => e return LogStash::ConvergeResult::FailedAction.from_exception(e) end @@ -36,6 +36,7 @@ def execute(agent, pipelines) return LogStash::ConvergeResult::FailedAction.new("Cannot reload pipeline, because the new pipeline is not reloadable") end + logger.info("Reloading pipeline", "pipeline.id" => pipeline_id) status = Stop.new(pipeline_id).execute(agent, pipelines) if status diff --git a/logstash-core/lib/logstash/pipeline_settings.rb b/logstash-core/lib/logstash/pipeline_settings.rb new file mode 100644 index 00000000000..86221e0634f --- /dev/null +++ b/logstash-core/lib/logstash/pipeline_settings.rb @@ -0,0 +1,50 @@ +# encoding: utf-8 +require "logstash/settings" + +module LogStash + class PipelineSettings < Settings + + # there are settings that the pipeline uses and can be changed per pipeline instance + SETTINGS_WHITE_LIST = [ + "config.debug", + "config.reload.automatic", + "config.reload.interval", + "config.string", + "dead_letter_queue.enable", + "metric.collect", + "path.config", + "path.queue", + "pipeline.batch.delay", + "pipeline.batch.size", + "pipeline.id", + "pipeline.output.workers", + "pipeline.reloadable", + "pipeline.system", + "pipeline.workers", + "queue.checkpoint.acks", + "queue.checkpoint.interval", + "queue.checkpoint.writes", + "queue.drain", + "queue.max_bytes", + "queue.max_events", + "queue.page_capacity", + "queue.type", + ] + + # register a set of settings that is used as the default set of pipelines settings + def self.from_settings(settings) + pipeline_settings = self.new + SETTINGS_WHITE_LIST.each do |setting| + pipeline_settings.register(settings.get_setting(setting).clone) + end + pipeline_settings + end + + def register(setting) + unless SETTINGS_WHITE_LIST.include?(setting.name) + raise ArgumentError.new("Only pipeline related settings can be registed in a PipelineSettings object. Received \"#{setting.name}\". Allowed settings: #{SETTINGS_WHITE_LIST}") + end + super(setting) + end + end +end diff --git a/logstash-core/lib/logstash/runner.rb b/logstash-core/lib/logstash/runner.rb index 9f073e30696..0404614011c 100644 --- a/logstash-core/lib/logstash/runner.rb +++ b/logstash-core/lib/logstash/runner.rb @@ -175,6 +175,7 @@ def initialize(*args) # Default we check local sources: `-e`, `-f` and the logstash.yml options. @source_loader = LogStash::Config::SourceLoader.new(@settings) @source_loader.add_source(LogStash::Config::Source::Local.new(@settings)) + @source_loader.add_source(LogStash::Config::Source::MultiLocal.new(@settings)) super(*args) end @@ -272,7 +273,7 @@ def execute # TODO(ph): make it better for multiple pipeline if results.success? results.response.each do |pipeline_config| - LogStash::BasePipeline.new(pipeline_config.config_string) + LogStash::BasePipeline.new(pipeline_config) end puts "Configuration OK" logger.info "Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash" diff --git a/logstash-core/lib/logstash/settings.rb b/logstash-core/lib/logstash/settings.rb index 1290c3d82c0..24f97ba426f 100644 --- a/logstash-core/lib/logstash/settings.rb +++ b/logstash-core/lib/logstash/settings.rb @@ -109,12 +109,9 @@ def reset @settings.values.each(&:reset) end - def from_yaml(yaml_path) - settings = read_yaml(::File.join(yaml_path, "logstash.yml")) - self.merge( - deep_replace(flatten_hash(settings)), - true - ) + def from_yaml(yaml_path, file_name="logstash.yml") + settings = read_yaml(::File.join(yaml_path, file_name)) + self.merge(deep_replace(flatten_hash(settings)), true) self end @@ -140,6 +137,11 @@ def validate_all end end + def ==(other) + return false unless other.kind_of?(::LogStash::Settings) + self.to_hash == other.to_hash + end + private def read_yaml(path) YAML.safe_load(IO.read(path)) || {} diff --git a/logstash-core/lib/logstash/state_resolver.rb b/logstash-core/lib/logstash/state_resolver.rb index 541f95c28cf..de4c5243620 100644 --- a/logstash-core/lib/logstash/state_resolver.rb +++ b/logstash-core/lib/logstash/state_resolver.rb @@ -19,9 +19,7 @@ def resolve(pipelines, pipeline_configs) if pipeline.nil? actions << LogStash::PipelineAction::Create.new(pipeline_config, @metric) else - # TODO(ph): The pipeline should keep a reference to the original PipelineConfig - # and we could use straight comparison. - if pipeline_config.config_hash != pipeline.config_hash + if pipeline_config != pipeline.pipeline_config actions << LogStash::PipelineAction::Reload.new(pipeline_config, @metric) end end diff --git a/logstash-core/locales/en.yml b/logstash-core/locales/en.yml index e1ae825c358..a82185e4891 100644 --- a/logstash-core/locales/en.yml +++ b/logstash-core/locales/en.yml @@ -102,6 +102,8 @@ en: Settings 'path.config' (-f) and 'config.string' (-e) can't be used simultaneously. reload-without-config-path: >- Configuration reloading also requires passing a configuration path with '-f yourlogstash.conf' + reload-with-config-string: >- + Configuration reloading can't be used with 'config.string' (-e). locked-data-path: >- Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting. invalid-shell: >- diff --git a/logstash-core/spec/logstash/api/modules/node_spec.rb b/logstash-core/spec/logstash/api/modules/node_spec.rb index 9d2daa2c9ef..058c49257c9 100644 --- a/logstash-core/spec/logstash/api/modules/node_spec.rb +++ b/logstash-core/spec/logstash/api/modules/node_spec.rb @@ -109,12 +109,14 @@ extend ResourceDSLMethods root_structure = { - "pipeline" => { - "workers" => Numeric, - "batch_size" => Numeric, - "batch_delay" => Numeric, - "config_reload_automatic" => Boolean, - "config_reload_interval" => Numeric + "pipelines" => { + "main" => { + "workers" => Numeric, + "batch_size" => Numeric, + "batch_delay" => Numeric, + "config_reload_automatic" => Boolean, + "config_reload_interval" => Numeric + } }, "os" => { "name" => String, diff --git a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb index 2250e885e1a..a6c9b50616f 100644 --- a/logstash-core/spec/logstash/api/modules/node_stats_spec.rb +++ b/logstash-core/spec/logstash/api/modules/node_stats_spec.rb @@ -73,13 +73,15 @@ "load_average" => { "1m" => Numeric } } }, - "pipeline" => { - "events" => { - "duration_in_millis" => Numeric, - "in" => Numeric, - "filtered" => Numeric, - "out" => Numeric, - "queue_push_duration_in_millis" => Numeric + "pipelines" => { + "main" => { + "events" => { + "duration_in_millis" => Numeric, + "in" => Numeric, + "filtered" => Numeric, + "out" => Numeric, + "queue_push_duration_in_millis" => Numeric + } } }, "reloads" => { diff --git a/logstash-core/spec/logstash/config/source/multi_local_spec.rb b/logstash-core/spec/logstash/config/source/multi_local_spec.rb new file mode 100644 index 00000000000..9ef2a49d757 --- /dev/null +++ b/logstash-core/spec/logstash/config/source/multi_local_spec.rb @@ -0,0 +1,113 @@ +# encoding: utf-8 +require "logstash/config/source/multi_local" +require "rspec/expectations" +require "stud/temporary" +require "fileutils" +require "pathname" +require_relative "../../../support/helpers" +require_relative "../../../support/matchers" +require "spec_helper" +require "webmock/rspec" + +describe LogStash::Config::Source::MultiLocal do + subject { described_class.new(settings) } + let(:settings) { mock_settings({}) } + let(:pipelines_yaml_location) { "" } + + before(:each) do + allow(subject).to receive(:pipelines_yaml_location).and_return(pipelines_yaml_location) + end + + describe "#match?" do + context "when `config.string` is set" do + let(:settings) do + mock_settings("config.string" => "") + end + it "returns false" do + expect(subject.match?).to be_falsey + end + end + + context "when `config.path` are set`" do + let(:config_file) { temporary_file("") } + + let(:settings) do + mock_settings("path.config" => config_file) + end + + it "returns false" do + expect(subject.match?).to be_falsey + end + end + + context "when both `config.string` and `path.config` are set" do + let(:settings) do + mock_settings("config.string" => "", "path.config" => temporary_file("")) + end + it "returns false" do + expect(subject.match?).to be_falsey + end + end + + context "when neither `config.path` nor `path.config` are set`" do + it "returns true" do + expect(subject.match?).to be_truthy + end + end + end + describe "#detect_duplicate_pipelines" do + let(:retrieved_pipelines) { [{}] } + let(:retrieved_pipelines_configs) { retrieved_pipelines.map {|h| mock_settings(h) } } + context "when there are duplicate pipeline ids" do + let(:retrieved_pipelines) do + [ + {"pipeline.id" => "main", "config.string" => ""}, + {"pipeline.id" => "main", "config.string" => ""}, + ] + end + it "should raise a ConfigurationError" do + expect { subject.detect_duplicate_pipelines(retrieved_pipelines_configs) }.to raise_error(::LogStash::ConfigurationError) + end + end + context "when there are no duplicate pipeline ids" do + let(:retrieved_pipelines) do + [ + {"pipeline.id" => "main", "config.string" => ""}, + {"pipeline.id" => "backup", "config.string" => ""}, + ] + end + it "should not raise an error" do + expect { subject.detect_duplicate_pipelines(retrieved_pipelines_configs) }.to_not raise_error + end + end + end + + describe "#pipeline_configs" do + let(:retrieved_pipelines) do + [ + { "pipeline.id" => "main", "config.string" => "" }, + { "pipeline.id" => "backup", "config.string" => "" } + ] + end + before(:each) do + allow(subject).to receive(:retrieve_yaml_pipelines).and_return(retrieved_pipelines) + end + + it "should return instances of PipelineConfig" do + configs = subject.pipeline_configs + expect(configs).to be_a(Array) + expect(subject.pipeline_configs.first).to be_a(::LogStash::Config::PipelineConfig) + expect(subject.pipeline_configs.last).to be_a(::LogStash::Config::PipelineConfig) + end + + context "using non pipeline related settings" do + let(:retrieved_pipelines) do [ + { "pipeline.id" => "main", "config.string" => "", "http.port" => 22222 }, + ] + end + it "should raise and error" do + expect { subject.pipeline_configs }.to raise_error(ArgumentError) + end + end + end +end diff --git a/logstash-core/spec/logstash/pipeline_action/reload_spec.rb b/logstash-core/spec/logstash/pipeline_action/reload_spec.rb index 48ca9f665da..fc2db33bb60 100644 --- a/logstash-core/spec/logstash/pipeline_action/reload_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/reload_spec.rb @@ -10,7 +10,7 @@ let(:pipeline_id) { :main } let(:new_pipeline_config) { mock_pipeline_config(pipeline_id, "input { generator { id => 'new' } } output { null {} }", { "pipeline.reloadable" => true}) } let(:pipeline_config) { "input { generator {} } output { null {} }" } - let(:pipeline) { LogStash::Pipeline.new(pipeline_config, mock_settings("pipeline.reloadable" => true)) } + let(:pipeline) { mock_pipeline_from_string(pipeline_config, mock_settings("pipeline.reloadable" => true)) } let(:pipelines) { { pipeline_id => pipeline } } let(:agent) { double("agent") } diff --git a/logstash-core/spec/logstash/pipeline_action/stop_spec.rb b/logstash-core/spec/logstash/pipeline_action/stop_spec.rb index 18d34a5d5c8..e4971ec3352 100644 --- a/logstash-core/spec/logstash/pipeline_action/stop_spec.rb +++ b/logstash-core/spec/logstash/pipeline_action/stop_spec.rb @@ -8,7 +8,7 @@ describe LogStash::PipelineAction::Stop do let(:pipeline_config) { "input { generator {} } output { null {} }" } let(:pipeline_id) { :main } - let(:pipeline) { LogStash::Pipeline.new(pipeline_config) } + let(:pipeline) { mock_pipeline_from_string(pipeline_config) } let(:pipelines) { { :main => pipeline } } let(:agent) { double("agent") } diff --git a/logstash-core/spec/logstash/pipeline_dlq_commit_spec.rb b/logstash-core/spec/logstash/pipeline_dlq_commit_spec.rb index 530f27f74bb..370318dc059 100644 --- a/logstash-core/spec/logstash/pipeline_dlq_commit_spec.rb +++ b/logstash-core/spec/logstash/pipeline_dlq_commit_spec.rb @@ -56,7 +56,7 @@ def close() end eos } - subject { LogStash::Pipeline.new(test_config, pipeline_settings_obj, metric) } + subject { mock_pipeline_from_string(test_config, pipeline_settings_obj, metric) } before(:each) do pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) } diff --git a/logstash-core/spec/logstash/pipeline_pq_file_spec.rb b/logstash-core/spec/logstash/pipeline_pq_file_spec.rb index 65557f824e7..0b0fbe39246 100644 --- a/logstash-core/spec/logstash/pipeline_pq_file_spec.rb +++ b/logstash-core/spec/logstash/pipeline_pq_file_spec.rb @@ -2,6 +2,7 @@ require "spec_helper" require "logstash/inputs/generator" require "logstash/filters/multiline" +require_relative "../support/helpers" class PipelinePqFileOutput < LogStash::Outputs::Base config_name "pipelinepqfileoutput" @@ -67,7 +68,8 @@ def close let(:pipeline_settings) { { "queue.type" => queue_type, "pipeline.workers" => worker_thread_count, "pipeline.id" => pipeline_id} } - subject { described_class.new(config, pipeline_settings_obj, metric) } + let(:pipeline_config) { mock_pipeline_config(pipeline_id, config, pipeline_settings_obj) } + subject { described_class.new(pipeline_config, metric) } let(:counting_output) { PipelinePqFileOutput.new({ "id" => output_id }) } let(:metric_store) { subject.metric.collector.snapshot_metric.metric_store } diff --git a/logstash-core/spec/logstash/pipeline_reporter_spec.rb b/logstash-core/spec/logstash/pipeline_reporter_spec.rb index 68b48181996..636f98c34ce 100644 --- a/logstash-core/spec/logstash/pipeline_reporter_spec.rb +++ b/logstash-core/spec/logstash/pipeline_reporter_spec.rb @@ -2,6 +2,7 @@ require "spec_helper" require "logstash/pipeline" require "logstash/pipeline_reporter" +require_relative "../support/helpers" require_relative "../support/mocks_classes" #TODO: Figure out how to add more tests that actually cover inflight events @@ -11,7 +12,7 @@ let(:config) do "input { generator { count => #{generator_count} } } output { dummyoutput {} } " end - let(:pipeline) { LogStash::Pipeline.new(config)} + let(:pipeline) { mock_pipeline_from_string(config)} let(:reporter) { pipeline.reporter } before do diff --git a/logstash-core/spec/logstash/pipeline_spec.rb b/logstash-core/spec/logstash/pipeline_spec.rb index 6954fbcff0b..7401d9491af 100644 --- a/logstash-core/spec/logstash/pipeline_spec.rb +++ b/logstash-core/spec/logstash/pipeline_spec.rb @@ -154,7 +154,7 @@ class TestPipeline < LogStash::Pipeline abort_on_exception_state = Thread.abort_on_exception Thread.abort_on_exception = true - pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj) + pipeline = mock_pipeline_from_string(config, pipeline_settings_obj) t = Thread.new { pipeline.run } sleep(0.1) until pipeline.ready? wait(3).for do @@ -202,21 +202,21 @@ class TestPipeline < LogStash::Pipeline let(:logger) { double("pipeline logger").as_null_object } before do - expect(TestPipeline).to receive(:logger).and_return(logger) + expect(::LogStash::Pipeline).to receive(:logger).and_return(logger) allow(logger).to receive(:debug?).and_return(true) end it "should not receive a debug message with the compiled code" do pipeline_settings_obj.set("config.debug", false) expect(logger).not_to receive(:debug).with(/Compiled pipeline/, anything) - pipeline = TestPipeline.new(test_config_with_filters) + pipeline = mock_pipeline_from_string(test_config_with_filters) pipeline.close end it "should print the compiled code if config.debug is set to true" do pipeline_settings_obj.set("config.debug", true) expect(logger).to receive(:debug).with(/Compiled pipeline/, anything) - pipeline = TestPipeline.new(test_config_with_filters, pipeline_settings_obj) + pipeline = mock_pipeline_from_string(test_config_with_filters, pipeline_settings_obj) pipeline.close end end @@ -224,7 +224,7 @@ class TestPipeline < LogStash::Pipeline context "when there is no command line -w N set" do it "starts one filter thread" do msg = "Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads" - pipeline = TestPipeline.new(test_config_with_filters) + pipeline = mock_pipeline_from_string(test_config_with_filters) expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:count_was=>worker_thread_count, :filters=>["dummyfilter"]})) pipeline.run @@ -238,7 +238,7 @@ class TestPipeline < LogStash::Pipeline it "starts multiple filter thread" do msg = "Warning: Manual override - there are filters that might" + " not work with multiple worker threads" - pipeline = TestPipeline.new(test_config_with_filters, pipeline_settings_obj) + pipeline = mock_pipeline_from_string(test_config_with_filters, pipeline_settings_obj) expect(pipeline.logger).to receive(:warn).with(msg, hash_including({:worker_threads=> override_thread_count, :filters=>["dummyfilter"]})) pipeline.run expect(pipeline.worker_threads.size).to eq(override_thread_count) @@ -266,7 +266,7 @@ class TestPipeline < LogStash::Pipeline it "starts multiple filter threads" do skip("This test has been failing periodically since November 2016. Tracked as https://github.com/elastic/logstash/issues/6245") - pipeline = TestPipeline.new(test_config_with_filters) + pipeline = mock_pipeline_from_string(test_config_with_filters) pipeline.run expect(pipeline.worker_threads.size).to eq(worker_thread_count) pipeline.shutdown @@ -309,7 +309,7 @@ class TestPipeline < LogStash::Pipeline } context "output close" do - let(:pipeline) { TestPipeline.new(test_config_without_output_workers) } + let(:pipeline) { mock_pipeline_from_string(test_config_without_output_workers) } let(:output) { pipeline.outputs.first } before do @@ -339,7 +339,7 @@ class TestPipeline < LogStash::Pipeline let(:config) { "input { dummyinput {} } output { dummyoutput {} }"} it "should start the flusher thread only after the pipeline is running" do - pipeline = TestPipeline.new(config) + pipeline = mock_pipeline_from_string(config) expect(pipeline).to receive(:transition_to_running).ordered.and_call_original expect(pipeline).to receive(:start_flusher).ordered.and_call_original @@ -395,7 +395,7 @@ class TestPipeline < LogStash::Pipeline let(:config) { "input { dummyinput {} } output { dummyoutput {} }" } let(:batch_size) { 1 } let(:pipeline_settings) { { "pipeline.batch.size" => batch_size, "pipeline.workers" => 1 } } - let(:pipeline) { LogStash::Pipeline.new(config, pipeline_settings_obj) } + let(:pipeline) { mock_pipeline_from_string(config, pipeline_settings_obj) } let(:logger) { pipeline.logger } let(:warning_prefix) { Regexp.new("CAUTION: Recommended inflight events max exceeded!") } @@ -462,7 +462,7 @@ class TestPipeline < LogStash::Pipeline config = "input { } filter { } output { }" let(:settings) { LogStash::SETTINGS.clone } - subject { LogStash::Pipeline.new(config, settings, metric) } + subject { mock_pipeline_from_string(config, settings, metric) } after :each do subject.close @@ -563,8 +563,8 @@ class TestPipeline < LogStash::Pipeline pipeline_settings.each {|k, v| pipeline_settings_obj.set(k, v) } end - let(:pipeline1) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutput {}}") } - let(:pipeline2) { LogStash::Pipeline.new("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutputmore {}}") } + let(:pipeline1) { mock_pipeline_from_string("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutput {}}") } + let(:pipeline2) { mock_pipeline_from_string("input { dummyinputgenerator {} } filter { dummyfilter {} } output { dummyoutputmore {}}") } after do pipeline1.close @@ -605,8 +605,7 @@ class TestPipeline < LogStash::Pipeline it "flush periodically" do Thread.abort_on_exception = true - - pipeline = LogStash::Pipeline.new(config, pipeline_settings_obj) + pipeline = mock_pipeline_from_string(config, pipeline_settings_obj) t = Thread.new { pipeline.run } sleep(0.1) until pipeline.ready? wait(10).for do @@ -630,8 +629,8 @@ class TestPipeline < LogStash::Pipeline allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput) end - let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } - let(:pipeline2) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } + let(:pipeline1) { mock_pipeline_from_string("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } + let(:pipeline2) { mock_pipeline_from_string("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } # multiple pipelines cannot be instantiated using the same PQ settings, force memory queue before :each do @@ -668,7 +667,7 @@ class TestPipeline < LogStash::Pipeline EOS end - subject { described_class.new(config) } + subject { mock_pipeline_from_string(config) } context "when the pipeline is not started" do after :each do @@ -695,7 +694,7 @@ class TestPipeline < LogStash::Pipeline } EOS end - subject { described_class.new(config) } + subject { mock_pipeline_from_string(config) } context "when the pipeline is not started" do after :each do @@ -725,7 +724,7 @@ class TestPipeline < LogStash::Pipeline context "when collecting metrics in the pipeline" do let(:metric) { LogStash::Instrument::Metric.new(LogStash::Instrument::Collector.new) } - subject { described_class.new(config, pipeline_settings_obj, metric) } + subject { mock_pipeline_from_string(config, pipeline_settings_obj, metric) } let(:pipeline_settings) { { "pipeline.id" => pipeline_id } } let(:pipeline_id) { "main" } @@ -852,8 +851,8 @@ class TestPipeline < LogStash::Pipeline allow(LogStash::Plugin).to receive(:lookup).with("output", "dummyoutput").and_return(::LogStash::Outputs::DummyOutput) end - let(:pipeline1) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } - let(:pipeline2) { LogStash::Pipeline.new("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } + let(:pipeline1) { mock_pipeline_from_string("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } + let(:pipeline2) { mock_pipeline_from_string("input { generator {} } filter { dummyfilter {} } output { dummyoutput {}}") } # multiple pipelines cannot be instantiated using the same PQ settings, force memory queue before :each do @@ -873,14 +872,14 @@ class TestPipeline < LogStash::Pipeline end context "when the pipeline is a system pipeline" do - let(:pipeline) { LogStash::Pipeline.new("input { generator {} } output { null {} }", mock_settings("pipeline.system" => true)) } + let(:pipeline) { mock_pipeline_from_string("input { generator {} } output { null {} }", mock_settings("pipeline.system" => true)) } it "returns true" do expect(pipeline.system?).to be_truthy end end context "when the pipeline is not a system pipeline" do - let(:pipeline) { LogStash::Pipeline.new("input { generator {} } output { null {} }", mock_settings("pipeline.system" => false)) } + let(:pipeline) { mock_pipeline_from_string("input { generator {} } output { null {} }", mock_settings("pipeline.system" => false)) } it "returns true" do expect(pipeline.system?).to be_falsey end @@ -893,7 +892,7 @@ class TestPipeline < LogStash::Pipeline end context "when all plugins are reloadable and pipeline is configured as reloadable" do - let(:pipeline) { LogStash::Pipeline.new("input { generator {} } output { null {} }", mock_settings("pipeline.reloadable" => true)) } + let(:pipeline) { mock_pipeline_from_string("input { generator {} } output { null {} }", mock_settings("pipeline.reloadable" => true)) } it "returns true" do expect(pipeline.reloadable?).to be_truthy @@ -901,7 +900,7 @@ class TestPipeline < LogStash::Pipeline end context "when the plugins are not reloadable and pipeline is configured as reloadable" do - let(:pipeline) { LogStash::Pipeline.new("input { stdin {} } output { null {} }", mock_settings("pipeline.reloadable" => true)) } + let(:pipeline) { mock_pipeline_from_string("input { stdin {} } output { null {} }", mock_settings("pipeline.reloadable" => true)) } it "returns true" do expect(pipeline.reloadable?).to be_falsey @@ -909,7 +908,7 @@ class TestPipeline < LogStash::Pipeline end context "when all plugins are reloadable and pipeline is configured as non-reloadable" do - let(:pipeline) { LogStash::Pipeline.new("input { generator {} } output { null {} }", mock_settings("pipeline.reloadable" => false)) } + let(:pipeline) { mock_pipeline_from_string("input { generator {} } output { null {} }", mock_settings("pipeline.reloadable" => false)) } it "returns true" do expect(pipeline.reloadable?).to be_falsey diff --git a/logstash-core/spec/logstash/runner_spec.rb b/logstash-core/spec/logstash/runner_spec.rb index b1e84c19e6a..e929aaafce8 100644 --- a/logstash-core/spec/logstash/runner_spec.rb +++ b/logstash-core/spec/logstash/runner_spec.rb @@ -79,21 +79,6 @@ def run(args); end subject.run(args) end end - - context "with no arguments" do - let(:args) { [] } - - before(:each) do - allow(LogStash::Util::JavaVersion).to receive(:warn_on_bad_java_version) - end - - it "should show help" do - expect($stderr).to receive(:puts).once - expect(subject).to receive(:signal_usage_error).once.and_call_original - expect(subject).to receive(:show_short_help).once - subject.run(args) - end - end end context "--pluginpath" do @@ -122,7 +107,7 @@ def run(args); end context "--auto-reload" do subject { LogStash::Runner.new("") } - context "when -f is not given" do + context "when -e is given" do let(:args) { ["-r", "-e", "input {} output {}"] } diff --git a/logstash-core/spec/logstash/settings/array_coercible_spec.rb b/logstash-core/spec/logstash/settings/array_coercible_spec.rb index 7146ff0950a..8b52bb85009 100644 --- a/logstash-core/spec/logstash/settings/array_coercible_spec.rb +++ b/logstash-core/spec/logstash/settings/array_coercible_spec.rb @@ -43,4 +43,69 @@ end end end + + describe "#==" do + context "when comparing two settings" do + let(:setting_1) { described_class.new("option_1", element_class_1, value_1) } + let(:element_class_1) { String } + let(:setting_2) { described_class.new("option_1", element_class_2, value_2) } + let(:element_class_2) { String } + + context "where one was given a non array value" do + let(:value_1) { "a string" } + context "and the other also the same non array value" do + let(:value_2) { "a string" } + it "should be equal" do + expect(setting_1).to be == setting_2 + end + end + context "and the other also the same value in an array" do + let(:value_2) { [ "a string" ] } + it "should be equal" do + expect(setting_1).to be == setting_2 + end + end + context "and the other a different non array value" do + let(:value_2) { "a different string" } + it "should be equal" do + expect(setting_1).to_not be == setting_2 + end + end + context "and the other a different value in an array" do + let(:value_2) { [ "a different string" ] } + it "should be equal" do + expect(setting_1).to_not be == setting_2 + end + end + end + + context "where one was given a value in an array" do + let(:value_1) { [ "a string"] } + context "and the other the same value in an array" do + let(:value_2) { [ "a string" ] } + it "should be equal" do + expect(setting_1).to be == setting_2 + end + end + context "and the other the same value not in an array" do + let(:value_2) { "a string" } + it "should be equal" do + expect(setting_1).to be == setting_2 + end + end + context "and the other a different value in an array" do + let(:value_2) { [ "a different string" ] } + it "should be equal" do + expect(setting_1).to_not be == setting_2 + end + end + context "and the other a different value in an array" do + let(:value_2) { "a different string" } + it "should be equal" do + expect(setting_1).to_not be == setting_2 + end + end + end + end + end end diff --git a/logstash-core/spec/logstash/state_resolver_spec.rb b/logstash-core/spec/logstash/state_resolver_spec.rb index 37196f4d56e..5eb3aeb8a93 100644 --- a/logstash-core/spec/logstash/state_resolver_spec.rb +++ b/logstash-core/spec/logstash/state_resolver_spec.rb @@ -46,10 +46,12 @@ context "when some pipeline are running" do context "when a pipeline is running" do - let(:running_pipelines) { { :main => mock_pipeline(:main) } } + let(:main_pipeline) { mock_pipeline(:main) } + let(:main_pipeline_config) { main_pipeline.pipeline_config } + let(:running_pipelines) { { :main => main_pipeline } } context "when the pipeline config contains a new one and the existing" do - let(:pipeline_configs) { [mock_pipeline_config(:hello_world), mock_pipeline_config(:main)] } + let(:pipeline_configs) { [mock_pipeline_config(:hello_world), main_pipeline_config ] } it "creates the new one and keep the other one" do expect(subject.resolve(running_pipelines, pipeline_configs)).to have_actions( @@ -105,7 +107,7 @@ context "without system pipeline" do let(:pipeline_configs) do [ - mock_pipeline_config(:main1), + running_pipelines[:main1].pipeline_config, mock_pipeline_config(:main9), mock_pipeline_config(:main5, "input { generator {}}"), mock_pipeline_config(:main3, "input { generator {}}"), @@ -129,7 +131,7 @@ context "with system pipeline" do let(:pipeline_configs) do [ - mock_pipeline_config(:main1), + running_pipelines[:main1].pipeline_config, mock_pipeline_config(:main9), mock_pipeline_config(:main5, "input { generator {}}"), mock_pipeline_config(:main3, "input { generator {}}"), diff --git a/logstash-core/spec/support/helpers.rb b/logstash-core/spec/support/helpers.rb index 589a1299c1b..68cf6430edf 100644 --- a/logstash-core/spec/support/helpers.rb +++ b/logstash-core/spec/support/helpers.rb @@ -45,8 +45,13 @@ def mock_pipeline(pipeline_id, reloadable = true, config_hash = nil) settings = mock_settings("pipeline.id" => pipeline_id.to_s, "config.string" => config_string, "config.reload.automatic" => reloadable) - pipeline = LogStash::Pipeline.new(config_string, settings) - pipeline + pipeline_config = mock_pipeline_config(pipeline_id, config_string, settings) + LogStash::Pipeline.new(pipeline_config) +end + +def mock_pipeline_from_string(config_string, settings = LogStash::SETTINGS, metric = nil) + pipeline_config = mock_pipeline_config(settings.get("pipeline.id"), config_string, settings) + LogStash::Pipeline.new(pipeline_config, metric) end def mock_pipeline_config(pipeline_id, config_string = nil, settings = {}) @@ -98,4 +103,4 @@ def temporary_file(content, file_name = Time.now.to_i.to_s, directory = Stud::Te end end -SUPPORT_DIR = Pathname.new(::File.join(::File.dirname(__FILE__), "support")) \ No newline at end of file +SUPPORT_DIR = Pathname.new(::File.join(::File.dirname(__FILE__), "support")) diff --git a/qa/integration/services/monitoring_api.rb b/qa/integration/services/monitoring_api.rb index e14f56e3db9..fcb9aeb6caf 100644 --- a/qa/integration/services/monitoring_api.rb +++ b/qa/integration/services/monitoring_api.rb @@ -4,15 +4,16 @@ # Convenience class to interact with the HTTP monitoring APIs class MonitoringAPI - def pipeline_stats - resp = Manticore.get("http://localhost:9600/_node/stats/pipeline").body + def pipeline_stats(pipeline_id) + resp = Manticore.get("http://localhost:9600/_node/stats/pipelines/#{pipeline_id}").body stats_response = JSON.parse(resp) - stats_response["pipeline"] + stats_response.fetch("pipelines").fetch(pipeline_id) end def event_stats - stats = pipeline_stats - stats["events"] + resp = Manticore.get("http://localhost:9600/_node/stats").body + stats_response = JSON.parse(resp) + stats_response["events"] end def version diff --git a/qa/integration/specs/monitoring_api_spec.rb b/qa/integration/specs/monitoring_api_spec.rb index 765b01b2b26..9ed908a6e54 100644 --- a/qa/integration/specs/monitoring_api_spec.rb +++ b/qa/integration/specs/monitoring_api_spec.rb @@ -59,7 +59,7 @@ expect(result).not_to be_nil # we use fetch here since we want failed fetches to raise an exception # and trigger the retry block - queue_stats = result.fetch("pipeline").fetch("queue") + queue_stats = result.fetch("pipelines").fetch("main").fetch("queue") expect(queue_stats).not_to be_nil if logstash_service.settings.feature_flag == "persistent_queues" expect(queue_stats["type"]).to eq "persisted" diff --git a/qa/integration/specs/reload_config_spec.rb b/qa/integration/specs/reload_config_spec.rb index 24f37aba942..e45813a031d 100644 --- a/qa/integration/specs/reload_config_spec.rb +++ b/qa/integration/specs/reload_config_spec.rb @@ -57,18 +57,23 @@ expect(IO.read(output_file2).blank?).to be false end - # check metrics. It should be reset - result = logstash_service.monitoring_api.event_stats - expect(result["in"]).to eq(1) - expect(result["out"]).to eq(1) + # check instance metrics. It should not be reset + instance_event_stats = logstash_service.monitoring_api.event_stats + expect(instance_event_stats["in"]).to eq(2) + expect(instance_event_stats["out"]).to eq(2) + + # check pipeline metrics. It should be reset + pipeline_event_stats = logstash_service.monitoring_api.pipeline_stats("main")["events"] + expect(pipeline_event_stats["in"]).to eq(1) + expect(pipeline_event_stats["out"]).to eq(1) # check reload stats - reload_stats = logstash_service.monitoring_api.pipeline_stats["reloads"] + pipeline_reload_stats = logstash_service.monitoring_api.pipeline_stats("main")["reloads"] instance_reload_stats = logstash_service.monitoring_api.node_stats["reloads"] - expect(reload_stats["successes"]).to eq(1) - expect(reload_stats["failures"]).to eq(0) - expect(reload_stats["last_success_timestamp"].blank?).to be false - expect(reload_stats["last_error"]).to eq(nil) + expect(pipeline_reload_stats["successes"]).to eq(1) + expect(pipeline_reload_stats["failures"]).to eq(0) + expect(pipeline_reload_stats["last_success_timestamp"].blank?).to be false + expect(pipeline_reload_stats["last_error"]).to eq(nil) expect(instance_reload_stats["successes"]).to eq(1) expect(instance_reload_stats["failures"]).to eq(0) diff --git a/qa/integration/specs/settings_spec.rb b/qa/integration/specs/settings_spec.rb index 8e0972f1939..959f968ad13 100644 --- a/qa/integration/specs/settings_spec.rb +++ b/qa/integration/specs/settings_spec.rb @@ -78,14 +78,14 @@ def overwrite_settings(settings) end it "should exit when config test_and_exit is set" do + test_config_path = File.join(temp_dir, "test.config") + IO.write(test_config_path, "#{tcp_config}") + expect(File.exists?(test_config_path)).to be true s = {} - s["path.config"] = temp_dir + s["path.config"] = test_config_path s["config.test_and_exit"] = true s["path.logs"] = temp_dir overwrite_settings(s) - test_config_path = File.join(temp_dir, "test.config") - IO.write(test_config_path, "#{tcp_config}") - expect(File.exists?(test_config_path)).to be true @logstash_service.spawn_logstash try(num_retries) do expect(@logstash_service.exited?).to be true @@ -118,8 +118,8 @@ def overwrite_settings(settings) # now check monitoring API to validate node_info = @logstash_service.monitoring_api.node_info - expect(node_info["pipeline"]["workers"]).to eq(workers) - expect(node_info["pipeline"]["batch_size"]).to eq(batch_size) + expect(node_info["pipelines"]["main"]["workers"]).to eq(workers) + expect(node_info["pipelines"]["main"]["batch_size"]).to eq(batch_size) end it "start on a different HTTP port" do