Skip to content

Commit

Permalink
ElasticSearch output plugin to support multiple hosts and enhance sta…
Browse files Browse the repository at this point in the history
…bility. (by Hao Chen)

Closes elastic#1791
  • Loading branch information
haoch authored and Suyog Rao committed Oct 10, 2014
1 parent 29930cc commit 4a3194b
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 25 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ Contributors:
* Bernd Ahlers (bernd)
* Andrea Forni (andreaforni)
* Leandro Moreira (leandromoreira)
* Hao Chen (haoch)

Note: If you've sent me patches, bug reports, or otherwise contributed to
logstash, and you aren't on the list above and want to be, please let me know
Expand Down
77 changes: 60 additions & 17 deletions lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# The hostname or IP address of the host to use for Elasticsearch unicast discovery
# This is only required if the normal multicast/cluster discovery stuff won't
# work in your environment.
config :host, :validate => :string
#
# "127.0.0.1"
# ["127.0.0.1:9300","127.0.0.2:9300"]
config :host, :validate => :array

# The port for Elasticsearch transport to use.
#
Expand Down Expand Up @@ -211,16 +214,9 @@ def register

if @host.nil? && @protocol == "http"
@logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
@host = "localhost"
@host = ["localhost"]
end

options = {
:host => @host,
:port => @port,
:client_settings => client_settings
}


client_class = case @protocol
when "transport"
LogStash::Outputs::Elasticsearch::Protocols::TransportClient
Expand All @@ -237,23 +233,52 @@ def register
# Default @host with embedded to localhost. This should help avoid
# newbies tripping on ubuntu and other distros that have a default
# firewall that blocks multicast.
@host ||= "localhost"
@host ||= ["localhost"]

# Start Elasticsearch local.
start_local_elasticsearch
end

@client = client_class.new(options)
@client = Array.new

if protocol == "node" or @host.nil? # if @protocol is "node" or @host is not set
options = {
:host => @host,
:port => @port,
:client_settings => client_settings
}
@client << client_class.new(options)
else # if @protocol in ["transport","http"]
@host.each do |host|
(_host,_port) = host.split ":"
options = {
:host => _host,
:port => _port || @port,
:client_settings => client_settings
}
@logger.info "Create client to elasticsearch server on #{_host}:#{_port}"
@client << client_class.new(options)
end # @host.each
end

if @manage_template
for client in @client
begin
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
client.template_install(@template_name, get_template, @template_overwrite)
break
rescue => e
@logger.error("Failed to install template: #{e.message}")
end
end # for @client loop
end # if @manage_templates

@logger.info("New Elasticsearch output", :cluster => @cluster,
:host => @host, :port => @port, :embedded => @embedded,
:protocol => @protocol)


if @manage_template
@logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s)
@client.template_install(@template_name, get_template, @template_overwrite)
end # if @manage_templates
@client_idx = 0
@current_client = @client[@client_idx]

buffer_initialize(
:max_items => @flush_size,
Expand All @@ -262,6 +287,13 @@ def register
)
end # def register

protected
def shift_client
@client_idx = (@client_idx+1) % @client.length
@current_client = @client[@client_idx]
@logger.debug? and @logger.debug("Switched current elasticsearch client to ##{@client_idx} at #{@host[@client_idx]}")
end

public
def get_template
if @template.nil?
Expand Down Expand Up @@ -308,7 +340,18 @@ def receive(event)
end # def receive

def flush(actions, teardown=false)
@client.bulk(actions)
begin
@logger.debug? and @logger.debug "Sending bulk of actions to client[#{@client_idx}]: #{@host[@client_idx]}"
@current_client.bulk(actions)
rescue => e
@logger.error "Got error to send bulk of actions to elasticsearch server at #{@host[@client_idx]} : #{e.message}"
raise e
ensure
unless @protocol == "node"
@logger.debug? and @logger.debug "Shifting current elasticsearch client"
shift_client
end
end
# TODO(sissel): Handle errors. Since bulk requests could mostly succeed
# (aka partially fail), we need to figure out what documents need to be
# retried.
Expand Down
40 changes: 32 additions & 8 deletions lib/logstash/outputs/elasticsearch/protocol.rb
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,40 @@ def setup(options={})
end

def hosts(options)
if options[:port].to_s =~ /^\d+-\d+$/
# port ranges are 'host[port1-port2]' according to
# http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
# However, it seems to only query the first port.
# So generate our own list of unicast hosts to scan.
range = Range.new(*options[:port].split("-"))
return range.collect { |p| "#{options[:host]}:#{p}" }.join(",")
# http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
result = Array.new
if options[:host].class == Array
options[:host].each do |host|
if host.to_s =~ /^.+:.+$/
# For host in format: host:port, ignore options[:port]
result << host
else
if options[:port].to_s =~ /^\d+-\d+$/
# port ranges are 'host[port1-port2]'
result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" }
else
result << "#{host}:#{options[:port]}"
end
end
end
else
return "#{options[:host]}:#{options[:port]}"
if options[:host].to_s =~ /^.+:.+$/
# For host in format: host:port, ignore options[:port]
result << options[:host]
else
if options[:port].to_s =~ /^\d+-\d+$/
# port ranges are 'host[port1-port2]' according to
# http://www.elasticsearch.org/guide/reference/modules/discovery/zen/
# However, it seems to only query the first port.
# So generate our own list of unicast hosts to scan.
range = Range.new(*options[:port].split("-"))
result << range.collect { |p| "#{options[:host]}:#{p}" }
else
result << "#{options[:host]}:#{options[:port]}"
end
end
end
result.flatten.join(",")
end # def hosts

def build_client(options)
Expand Down
40 changes: 40 additions & 0 deletions spec/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -346,4 +346,44 @@
end
end
end

describe "elasticsearch protocol" do
# ElasticSearch related jars
LogStash::Environment.load_elasticsearch_jars!
# Load elasticsearch protocol
require "logstash/outputs/elasticsearch/protocol"

describe "elasticsearch node client" do
# Test ElasticSearch Node Client
# Reference: http://www.elasticsearch.org/guide/reference/modules/discovery/zen/

it "should support hosts in both string and array" do
# Because we defined *hosts* method in NodeClient as private,
# we use *obj.send :method,[args...]* to call method *hosts*
client = LogStash::Outputs::Elasticsearch::Protocols::NodeClient.new

# Node client should support host in string
# Case 1: default :host in string
insist { client.send :hosts, :host => "host",:port => 9300 } == "host:9300"
# Case 2: :port =~ /^\d+_\d+$/
insist { client.send :hosts, :host => "host",:port => "9300-9302"} == "host:9300,host:9301,host:9302"
# Case 3: :host =~ /^.+:.+$/
insist { client.send :hosts, :host => "host:9303",:port => 9300 } == "host:9303"
# Case 4: :host =~ /^.+:.+$/ and :port =~ /^\d+_\d+$/
insist { client.send :hosts, :host => "host:9303",:port => "9300-9302"} == "host:9303"

# Node client should support host in array
# Case 5: :host in array with single item
insist { client.send :hosts, :host => ["host"],:port => 9300 } == ("host:9300")
# Case 6: :host in array with more than one items
insist { client.send :hosts, :host => ["host1","host2"],:port => 9300 } == "host1:9300,host2:9300"
# Case 7: :host in array with more than one items and :port =~ /^\d+_\d+$/
insist { client.send :hosts, :host => ["host1","host2"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9300,host2:9301,host2:9302"
# Case 8: :host in array with more than one items and some :host =~ /^.+:.+$/
insist { client.send :hosts, :host => ["host1","host2:9303"],:port => 9300 } == "host1:9300,host2:9303"
# Case 9: :host in array with more than one items, :port =~ /^\d+_\d+$/ and some :host =~ /^.+:.+$/
insist { client.send :hosts, :host => ["host1","host2:9303"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9303"
end
end
end
end

0 comments on commit 4a3194b

Please sign in to comment.