Skip to content

Commit

Permalink
Revert "[X-Pack Monitoring] Report pipeline protocol (elastic#9516)" (e…
Browse files Browse the repository at this point in the history
…lastic#9568)

This reverts commit dc168a1.
  • Loading branch information
ycombinator authored May 14, 2018
1 parent b9cd586 commit 6a895a4
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 24 deletions.
13 changes: 2 additions & 11 deletions logstash-core/lib/logstash/config/pipeline_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,14 @@ module LogStash module Config
class PipelineConfig
include LogStash::Util::Loggable

attr_reader :source, :pipeline_id, :config_parts, :protocol, :settings, :read_at
attr_reader :source, :pipeline_id, :config_parts, :settings, :read_at

def initialize(source, pipeline_id, config_parts, settings)
config_parts_array = config_parts.is_a?(Array) ? config_parts : [config_parts]
unique_protocols = config_parts_array
.map { |config_part| config_part.protocol.to_s }
.uniq

if unique_protocols.length > 1
raise(ArgumentError, "There should be exactly 1 unique protocol in config_parts. Found #{unique_protocols.length.to_s}.")
end

@source = source
@pipeline_id = pipeline_id
# We can't use Array() since config_parts may be a java object!
config_parts_array = config_parts.is_a?(Array) ? config_parts : [config_parts]
@config_parts = config_parts_array.sort_by { |config_part| [config_part.protocol.to_s, config_part.id] }
@protocol = unique_protocols[0]
@settings = settings
@read_at = Time.now
end
Expand Down
11 changes: 1 addition & 10 deletions logstash-core/spec/logstash/config/pipeline_config_spec.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# encoding: utf-8
require "logstash/config/pipeline_config"
require "logstash/config/source/local"
require_relative "../../support/helpers"

describe LogStash::Config::PipelineConfig do
let(:source) { LogStash::Config::Source::Local }
Expand All @@ -14,6 +13,7 @@
org.logstash.common.SourceWithMetadata.new("file", "/tmp/4", 0, 0, "input { generator4 }"),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/5", 0, 0, "input { generator5 }"),
org.logstash.common.SourceWithMetadata.new("file", "/tmp/6", 0, 0, "input { generator6 }"),
org.logstash.common.SourceWithMetadata.new("string", "config_string", 0, 0, "input { generator1 }"),
]
end

Expand Down Expand Up @@ -72,13 +72,4 @@
end
end
end

it "returns the pipeline's protocol" do
expect(subject.protocol).to eq((ordered_config_parts.uniq { | config_part | config_part.protocol })[0].protocol)
end

it "raises an ArgumentError when multiple protocols are supplied" do
unordered_config_parts << org.logstash.common.SourceWithMetadata.new("string", "config_string", 0, 0, "input { generator0 }")
expect { described_class.new(source, pipeline_id, unordered_config_parts, settings) }.to raise_error ArgumentError, /.+Found 2\./
end
end
3 changes: 1 addition & 2 deletions x-pack/lib/config_management/elasticsearch_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class RemoteConfigError < LogStash::Error; end
queue.max_bytes
queue.checkpoint.writes
)
CENTRALLY_MANAGED_PIPELINE_PROTOCOL = "x-pack-config-management"

def initialize(settings)
super(settings)
Expand Down Expand Up @@ -99,7 +98,7 @@ def get_pipeline(response)

raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty?

config_part = org.logstash.common.SourceWithMetadata.new(CENTRALLY_MANAGED_PIPELINE_PROTOCOL, pipeline_id.to_s, config_string)
config_part = org.logstash.common.SourceWithMetadata.new("x-pack-config-management", pipeline_id.to_s, config_string)

# We don't support multiple pipelines, so use the global settings from the logstash.yml file
settings = @settings.clone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def pipeline_data(pipeline)
"id" => pipeline.pipeline_id,
"hash" => pipeline.lir.unique_hash,
"ephemeral_id" => pipeline.ephemeral_id,
"protocol" => pipeline.pipeline_config.protocol,
"workers" => pipeline.settings.get("pipeline.workers"),
"batch_size" => pipeline.settings.get("pipeline.batch.size"),
"representation" => ::LogStash::Inputs::Metrics::StateEvent::LIRSerializer.serialize(pipeline.lir)
Expand Down

0 comments on commit 6a895a4

Please sign in to comment.