Skip to content

Commit

Permalink
Update failing policy in Central Management fetcher and license check…
Browse files Browse the repository at this point in the history
…er if hit ES down node (elastic#13689)

Wraps the calls to the central management Elasticsearch cluster with the utility class Stud::Try to handle the remote host error when the client used to connect hit a not available node.

Co-authored-by: Ry Biesemeyer <[email protected]>
  • Loading branch information
andsel and yaauie authored Mar 23, 2022
1 parent 01fd474 commit c544ecb
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 10 deletions.
6 changes: 5 additions & 1 deletion x-pack/lib/config_management/elasticsearch_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "logstash/outputs/elasticsearch"
require "logstash/json"
require 'helpers/elasticsearch_options'
require 'helpers/loggable_try'
require "license_checker/licensed"


Expand Down Expand Up @@ -53,7 +54,10 @@ def config_conflict?

# decide using system indices api (7.10+) or legacy api (< 7.10) base on elasticsearch server version
def get_pipeline_fetcher
response = client.get("/")
retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'fetch pipelines from Central Management')
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) {
client.get("/")
}

if response["error"]
raise RemoteConfigError, "Cannot find elasticsearch version, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
Expand Down
18 changes: 18 additions & 0 deletions x-pack/lib/helpers/loggable_try.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.

require 'stud/try'

module LogStash module Helpers
class LoggableTry < Stud::Try
def initialize(logger, name)
@logger = logger
@name = name
end

def log_failure(exception, fail_count, message)
@logger.warn("Attempt to #{@name} failed. #{message}", fail_count: fail_count, exception: exception.message)
end
end
end end
5 changes: 4 additions & 1 deletion x-pack/lib/license_checker/license_reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ def initialize(settings, feature, options)
# fetches an XPackInfo, or log and return nil if unavailable.
# @return [XPathInfo, nil]
def fetch_xpack_info
retry_handler = ::LogStash::Helpers::LoggableTry.new(logger, 'validate Elasticsearch license')
begin
response = client.get('_xpack')
response = retry_handler.try(10.times, ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError) {
client.get("_xpack")
}

# TODO: do we need both this AND the exception-based control flow??
return XPackInfo.xpack_not_installed if xpack_missing_response?(response)
Expand Down
5 changes: 4 additions & 1 deletion x-pack/qa/integration/management/read_configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def logstash_options(pipeline_id, wait_condition)
"xpack.management.enabled" => true,
"xpack.management.pipeline.id" => pipeline_id,
"xpack.management.logstash.poll_interval" => "1s",
"xpack.management.elasticsearch.hosts" => ["http://localhost:9200"],
"xpack.management.elasticsearch.hosts" => ["http://localhost:9200", "http://localhost:19200"],
"xpack.management.elasticsearch.username" => "elastic",
"xpack.management.elasticsearch.password" => elastic_password,
"xpack.monitoring.elasticsearch.username" => "elastic",
Expand All @@ -37,6 +37,9 @@ def start_services(elasticsearch_options, logstash_options)
push_elasticsearch_config(PIPELINE_ID, config)

@logstash_service = logstash("bin/logstash -w 1", logstash_options)

full_log = @logstash_service.stdout_lines.join("\n")
expect(full_log).not_to match(/Could not fetch all the sources/)
end

def stop_services
Expand Down
29 changes: 22 additions & 7 deletions x-pack/spec/license_checker/license_reader_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,37 @@
describe '#fetch_xpack_info' do
let(:xpack_info_class) { LogStash::LicenseChecker::XPackInfo }
let(:mock_client) { double('Client') }
before(:each) { expect(subject).to receive(:client).and_return(mock_client) }
before(:each) { expect(subject).to receive(:client).and_return(mock_client).at_most(:twice) }
let(:xpack_info) do
{
"license" => {},
"features" => {},
}
end

context 'when client fetches xpack info' do
let(:xpack_info) do
{
"license" => {},
"features" => {},
}
end
before(:each) do
expect(mock_client).to receive(:get).with('_xpack').and_return(xpack_info)
end
it 'returns an XPackInfo' do
expect(subject.fetch_xpack_info).to eq(xpack_info_class.from_es_response(xpack_info))
end
end

context 'and receives HostUnreachableError' do
let(:host_not_reachable) { LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(StandardError.new("original error"), "http://localhost:19200") }
before(:each) do
# set up expectation of single failure
expect(subject.logger).to receive(:warn).with(a_string_starting_with("Attempt to validate Elasticsearch license failed."), any_args)
expect(mock_client).to receive(:get).with('_xpack').and_raise(host_not_reachable).once

# ensure subsequent success
expect(mock_client).to receive(:get).with('_xpack').and_return(xpack_info)
end
it 'continues to fetch and return an XPackInfo' do
expect(subject.fetch_xpack_info.failed?).to be false
end
end
context 'when client raises a ConnectionError' do
before(:each) do
expect(mock_client).to receive(:get).with('_xpack').and_raise(Puma::ConnectionError)
Expand Down

0 comments on commit c544ecb

Please sign in to comment.