Skip to content

Commit

Permalink
Maintenance: Make ES connection establishment more stable by:
Browse files Browse the repository at this point in the history
- performing a retry when Net::OpenTimeout exceptions occur
- cache the result of ES version check
  • Loading branch information
thorsteneckel committed Sep 16, 2019
1 parent 256e751 commit 429b95e
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 56 deletions.
86 changes: 47 additions & 39 deletions lib/search_index_backend.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ def self.info
url,
{},
{
json: true,
open_timeout: 8,
read_timeout: 14,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
json: true,
open_timeout: 8,
read_timeout: 14,
open_socket_tries: 3,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
}
)
Rails.logger.info "# #{response.code}"
Expand Down Expand Up @@ -82,11 +83,12 @@ def self.processors(data)
response = UserAgent.delete(
url,
{
json: true,
open_timeout: 8,
read_timeout: 60,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
json: true,
open_timeout: 8,
read_timeout: 60,
open_socket_tries: 3,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
}
)
Rails.logger.info "# #{response.code}"
Expand All @@ -106,11 +108,12 @@ def self.processors(data)
url,
item,
{
json: true,
open_timeout: 8,
read_timeout: 60,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
json: true,
open_timeout: 8,
read_timeout: 60,
open_socket_tries: 3,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
}
)
Rails.logger.info "# #{response.code}"
Expand Down Expand Up @@ -179,11 +182,12 @@ def self.index(data)
url,
data[:data],
{
json: true,
open_timeout: 8,
read_timeout: 60,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
json: true,
open_timeout: 8,
read_timeout: 60,
open_socket_tries: 3,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
}
)
Rails.logger.info "# #{response.code}"
Expand Down Expand Up @@ -217,11 +221,12 @@ def self.add(type, data)
url,
data,
{
json: true,
open_timeout: 8,
read_timeout: 60,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
json: true,
open_timeout: 8,
read_timeout: 60,
open_socket_tries: 3,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
}
)
Rails.logger.info "# #{response.code}"
Expand Down Expand Up @@ -254,10 +259,11 @@ def self.remove(type, o_id = nil)
response = UserAgent.delete(
url,
{
open_timeout: 8,
read_timeout: 60,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
open_timeout: 8,
read_timeout: 60,
open_socket_tries: 3,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
}
)
Rails.logger.info "# #{response.code}"
Expand Down Expand Up @@ -365,11 +371,12 @@ def self.search_by_index(query, index, options = {})
url,
query_data,
{
json: true,
open_timeout: 5,
read_timeout: 14,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
json: true,
open_timeout: 5,
read_timeout: 14,
open_socket_tries: 3,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
}
)

Expand Down Expand Up @@ -512,11 +519,12 @@ def self.selectors(index, selectors = nil, options = {}, aggs_interval = nil)
url,
data,
{
json: true,
open_timeout: 5,
read_timeout: 14,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
json: true,
open_timeout: 5,
read_timeout: 14,
open_socket_tries: 3,
user: Setting.get('es_user'),
password: Setting.get('es_password'),
}
)

Expand Down
12 changes: 7 additions & 5 deletions lib/tasks/search_index_es.rake
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,14 @@ end

# get es version
def es_version
info = SearchIndexBackend.info
number = nil
if info.present?
number = info['version']['number'].to_s
@es_version ||= begin
info = SearchIndexBackend.info
number = nil
if info.present?
number = info['version']['number'].to_s
end
number
end
number
end

# no es_pipeline for elasticsearch 5.5 and lower
Expand Down
45 changes: 33 additions & 12 deletions lib/user_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ def self.get(url, params = {}, options = {}, count = 10)
# start http call
begin
total_timeout = options[:total_timeout] || 60
Timeout.timeout(total_timeout) do
response = http.request(request)
return process(request, response, uri, count, params, options)

handled_open_timeout(options[:open_socket_tries]) do
Timeout.timeout(total_timeout) do
response = http.request(request)
return process(request, response, uri, count, params, options)
end
end
rescue => e
log(url, request, nil, options)
Expand Down Expand Up @@ -112,9 +115,12 @@ def self.post(url, params = {}, options = {}, count = 10)
# start http call
begin
total_timeout = options[:total_timeout] || 60
Timeout.timeout(total_timeout) do
response = http.request(request)
return process(request, response, uri, count, params, options)

handled_open_timeout(options[:open_socket_tries]) do
Timeout.timeout(total_timeout) do
response = http.request(request)
return process(request, response, uri, count, params, options)
end
end
rescue => e
log(url, request, nil, options)
Expand Down Expand Up @@ -164,9 +170,12 @@ def self.put(url, params = {}, options = {}, count = 10)
# start http call
begin
total_timeout = options[:total_timeout] || 60
Timeout.timeout(total_timeout) do
response = http.request(request)
return process(request, response, uri, count, params, options)

handled_open_timeout(options[:open_socket_tries]) do
Timeout.timeout(total_timeout) do
response = http.request(request)
return process(request, response, uri, count, params, options)
end
end
rescue => e
log(url, request, nil, options)
Expand Down Expand Up @@ -209,9 +218,11 @@ def self.delete(url, options = {}, count = 10)
# start http call
begin
total_timeout = options[:total_timeout] || 60
Timeout.timeout(total_timeout) do
response = http.request(request)
return process(request, response, uri, count, {}, options)
handled_open_timeout(options[:open_socket_tries]) do
Timeout.timeout(total_timeout) do
response = http.request(request)
return process(request, response, uri, count, {}, options)
end
end
rescue => e
log(url, request, nil, options)
Expand Down Expand Up @@ -488,6 +499,16 @@ def self.ftp(uri, options)
)
end

def self.handled_open_timeout(tries)
tries ||= 1

tries.times do |index|
yield
rescue Net::OpenTimeout
raise if (index + 1) == tries
end
end

class Result

attr_reader :error
Expand Down

0 comments on commit 429b95e

Please sign in to comment.