Skip to content

Commit

Permalink
This PR changes two things:
Browse files Browse the repository at this point in the history
- Allow plugins author to decide how their plugins are structured
- Allow a new kind of plugin that allow plugins author to add hooks into logstash core.

Fixes elastic#6109
  • Loading branch information
ph committed Nov 11, 2016
1 parent 0bebf5a commit 6626fa8
Show file tree
Hide file tree
Showing 16 changed files with 479 additions and 130 deletions.
2 changes: 1 addition & 1 deletion lib/pluginmanager/list.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class LogStash::PluginManager::List < LogStash::PluginManager::Command
option "--installed", :flag, "List only explicitly installed plugins using bin/logstash-plugin install ...", :default => false
option "--verbose", :flag, "Also show plugin version number", :default => false
option "--group", "NAME", "Filter plugins per group: input, output, filter or codec" do |arg|
raise(ArgumentError, "should be one of: input, output, filter or codec") unless ['input', 'output', 'filter', 'codec'].include?(arg)
raise(ArgumentError, "should be one of: input, output, filter or codec") unless ['input', 'output', 'filter', 'codec', 'pack'].include?(arg)
arg
end

Expand Down
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
require "logstash/instrument/metric"
require "logstash/pipeline"
require "logstash/webserver"
require "logstash/event_dispatcher"
require "stud/trap"
require "logstash/config/loader"
require "uri"
Expand Down Expand Up @@ -52,6 +53,10 @@ def initialize(settings = LogStash::SETTINGS)
configure_metrics_collectors

@reload_metric = metric.namespace([:stats, :pipelines])

@dispatcher = LogStash::EventDispatcher.new(self)
LogStash::PluginRegistry.hooks.register_emitter(self.class, dispatcher)
dispatcher.fire(:after_initialize)
end

def execute
Expand Down
1 change: 0 additions & 1 deletion logstash-core/lib/logstash/config/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
require "logstash/namespace"
require "logstash/config/grammar"
require "logstash/config/config_ast"
require "logstash/config/registry"
require "logstash/errors"
require "logger"

Expand Down
8 changes: 2 additions & 6 deletions logstash-core/lib/logstash/config/mixin.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# encoding: utf-8
require "logstash/namespace"
require "logstash/config/registry"
require "logstash/plugins/registry"
require "logstash/logging"
require "logstash/util/password"
Expand Down Expand Up @@ -73,6 +72,7 @@ def config_init(params)
"in the future. #{extra} If you have any questions " +
"about this, please visit the #logstash channel " +
"on freenode irc.", :name => name, :plugin => self)

end
if opts && opts[:obsolete]
extra = opts[:obsolete].is_a?(String) ? opts[:obsolete] : ""
Expand Down Expand Up @@ -181,11 +181,7 @@ module DSL
# If no name given (nil), return the current name.
def config_name(name = nil)
@config_name = name if !name.nil?
LogStash::Config::Registry.registry[@config_name] = self
if self.respond_to?("plugin_type")
declare_plugin(self.plugin_type, @config_name)
end
return @config_name
@config_name
end
alias_method :config_plugin, :config_name

Expand Down
13 changes: 0 additions & 13 deletions logstash-core/lib/logstash/config/registry.rb

This file was deleted.

44 changes: 44 additions & 0 deletions logstash-core/lib/logstash/event_dispatcher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# encoding: utf-8
module LogStash
class EventDispatcher
java_import "java.util.concurrent.CopyOnWriteArrayList"

attr_reader :emitter

def initialize(emitter)
@emitter = emitter
@listeners = CopyOnWriteArrayList.new
end

# This operation is slow because we use a CopyOnWriteArrayList
# But the majority of the addition will be done at bootstrap time
# So add_listener shouldn't be called often at runtime.
#
# On the other hand the notification could be called really often.
def add_listener(listener)
@listeners.add(listener)
end

# This operation is slow because we use a `CopyOnWriteArrayList` as the backend, instead of a
# ConcurrentHashMap, but since we are mostly adding stuff and iterating the `CopyOnWriteArrayList`
# should provide a better performance.
#
# See note on add_listener, this method shouldn't be called really often.
def remove_listener(listener)
@listeners.remove(listener)
end

def fire(method_name, *arguments)
@listeners.each do |listener|
if listener.respond_to?(method_name)
if arguments.size > 0
listener.send(method_name, emitter, *arguments)
else
listener.send(method_name, emitter)
end
end
end
end
alias_method :execute, :fire
end
end
54 changes: 8 additions & 46 deletions logstash-core/lib/logstash/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
require "logstash/instrument/null_metric"
require "concurrent"
require "securerandom"
require "logstash/plugins/registry"

class LogStash::Plugin
include LogStash::Util::Loggable
Expand Down Expand Up @@ -39,7 +38,6 @@ def hash
self.class.name.hash
end


def eql?(other)
self.class.name == other.class.name && @params == other.params
end
Expand Down Expand Up @@ -120,50 +118,14 @@ def config_name
self.class.config_name
end


# Look up a plugin by type and name.
# This is keep for backward compatibility, the logic was moved into the registry class
# but some plugins use this method to return a specific instance on lookup
#
#
#
# Should I remove this now and make sure the pipeline invoke the Registry or I should wait for 6.0
# Its not really part of the public api but its used by the tests.
def self.lookup(type, name)
path = "logstash/#{type}s/#{name}"
LogStash::Registry.instance.lookup(type ,name) do |plugin_klass, plugin_name|
is_a_plugin?(plugin_klass, plugin_name)
end

rescue LoadError, NameError => e
logger.debug("Problems loading the plugin with", :type => type, :name => name, :path => path)
raise(LogStash::PluginLoadingError, I18n.t("logstash.pipeline.plugin-loading-error", :type => type, :name => name, :path => path, :error => e.to_s))
end

public
def self.declare_plugin(type, name)
path = "logstash/#{type}s/#{name}"
registry = LogStash::Registry.instance
registry.register(path, self)
end

private
# lookup a plugin by type and name in the existing LogStash module namespace
# ex.: namespace_lookup("filter", "grok") looks for LogStash::Filters::Grok
# @param type [String] plugin type, "input", "ouput", "filter"
# @param name [String] plugin name, ex.: "grok"
# @return [Class] the plugin class or raises NameError
# @raise NameError if plugin class does not exist or is invalid
def self.namespace_lookup(type, name)
type_const = "#{type.capitalize}s"
namespace = LogStash.const_get(type_const)
# the namespace can contain constants which are not for plugins classes (do not respond to :config_name)
# namespace.constants is the shallow collection of all constants symbols in namespace
# note that below namespace.const_get(c) should never result in a NameError since c is from the constants collection
klass_sym = namespace.constants.find { |c| is_a_plugin?(namespace.const_get(c), name) }
klass = klass_sym && namespace.const_get(klass_sym)
raise(NameError) unless klass
klass
end

# check if klass is a valid plugin for name
# @param klass [Class] plugin class
# @param name [String] plugin name
# @return [Boolean] true if klass is a valid plugin for name
def self.is_a_plugin?(klass, name)
klass.ancestors.include?(LogStash::Plugin) && klass.respond_to?(:config_name) && klass.config_name == name
LogStash::PluginRegistry.lookup_pipeline_plugin(type, name)
end
end # class LogStash::Plugin
57 changes: 57 additions & 0 deletions logstash-core/lib/logstash/plugins/hooks_registry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# encoding: utf-8
module LogStash module Plugins
# This calls allow logstash to expose the endpoints for listeners
class HooksRegistry
java_import "java.util.concurrent.ConcurrentHashMap"
java_import "java.util.concurrent.CopyOnWriteArrayList"

def initialize
@registered_emmitters = ConcurrentHashMap.new
@registered_hooks = ConcurrentHashMap.new
end

def register_emitter(emitter_scope, dispatcher)
@registered_emmitters.put(emitter_scope, dispatcher)
sync_hooks
end

def remove_emitter(emitter_scope)
@registered_emmitters.remove(emitter_scope)
end

def register_hooks(emitter_scope, callback)
callbacks = @registered_hooks.computeIfAbsent(emitter_scope) do
CopyOnWriteArrayList.new
end

callbacks.add(callback)
sync_hooks
end

def emmitters_count
@registered_emmitters.size
end

def hooks_count(emitter_scope = nil)
if emitter_scope.nil?
@registered_hooks.elements().collect(&:size).reduce(0, :+)
else
callbacks = @registered_hooks.get(emitter_scope)
callbacks.nil? ? 0 : @registered_hooks.get(emitter_scope).size
end
end

private
def sync_hooks
@registered_emmitters.each do |emitter, dispatcher|
listeners = @registered_hooks.get(emitter)

unless listeners.nil?
listeners.each do |listener|
dispatcher.add_listener(listener)
end
end
end
end
end
end end
Loading

0 comments on commit 6626fa8

Please sign in to comment.