Kouhei Sutou
null+****@clear*****
Sun Nov 24 13:23:23 JST 2013
Kouhei Sutou 2013-11-24 13:23:23 +0900 (Sun, 24 Nov 2013) New Revision: f5b9a0f78b35563e240614b2b5396dad83fd7152 https://github.com/droonga/fluent-plugin-droonga/commit/f5b9a0f78b35563e240614b2b5396dad83fd7152 Message: Extract Handler from Executor There are many duplicated codes... Copied files: lib/droonga/handler.rb (from lib/droonga/executor.rb) Modified files: lib/droonga/dispatcher.rb lib/droonga/executor.rb lib/droonga/handler_plugin.rb lib/droonga/plugin/handler/add.rb lib/droonga/plugin/handler/groonga.rb lib/droonga/plugin/handler/search.rb lib/droonga/plugin/handler/watch.rb lib/droonga/plugin_repository.rb lib/droonga/worker.rb Modified: lib/droonga/dispatcher.rb (+1 -1) =================================================================== --- lib/droonga/dispatcher.rb 2013-11-24 12:04:26 +0900 (431c351) +++ lib/droonga/dispatcher.rb 2013-11-24 13:23:23 +0900 (284e45a) @@ -35,7 +35,7 @@ module Droonga @local = Regexp.new("^#{@name}") plugins = ["collector"] + (Droonga.catalog.option("plugins")||[]) + ["adapter"] plugins.each do |plugin| - @worker.add_handler(plugin) + @worker.add_legacy_plugin(plugin) end end Modified: lib/droonga/executor.rb (+28 -32) =================================================================== --- lib/droonga/executor.rb 2013-11-24 12:04:26 +0900 (0e7d53f) +++ lib/droonga/executor.rb 2013-11-24 13:23:23 +0900 (88edeef) @@ -29,24 +29,25 @@ module Droonga attr_reader :context, :envelope, :name def initialize(options={}) - @handlers = [] + @legacy_plugins = [] @outputs = {} @options = options @name = options[:name] @database_name = options[:database] @queue_name = options[:queue_name] || "DroongaQueue" - @handler_names = options[:handlers] || [] @pool_size = options[:n_workers] || 0 # load_handlers Droonga::PluginLoader.load_all + @handler = Handler.new(@options) prepare end def shutdown $log.trace("#{log_tag}: shutdown: start") - @handlers.each do |handler| - handler.shutdown + @legacy_plugins.each do |legacy_plugin| + legacy_plugin.shutdown end + @handler.shutdown @outputs.each do |dest, output| output[:logger].close if output[:logger] end @@ -62,9 +63,9 @@ module Droonga $log.trace("#{log_tag}: shutdown: done") end - def add_handler(name) - handler = LegacyPlugin.repository.instantiate(name, self) - @handlers << handler + def add_legacy_plugin(name) + legacy_plugin = LegacyPlugin.repository.instantiate(name, self) + @legacy_plugins << legacy_plugin end def add_route(route) @@ -97,13 +98,13 @@ module Droonga return end body, command, arguments = parse_message(message) - handler = find_handler(command) - if handler + legacy_plugin = find_legacy_plugin(command) + if legacy_plugin $log.trace("#{log_tag}: execute_one: handle: start", - :hander => handler.class) - handler.handle(command, body, *arguments) + :hander => legacy_plugin.class) + legacy_plugin.handle(command, body, *arguments) $log.trace("#{log_tag}: execute_one: handle: done", - :hander => handler.class) + :hander => legacy_plugin.class) end $log.trace("#{log_tag}: execute_one: done") end @@ -140,15 +141,19 @@ module Droonga if receiver output(receiver, body, command, arguments) else - handler = find_handler(command) - if handler + legacy_plugin = find_legacy_plugin(command) + if legacy_plugin + $log.trace("#{log_tag}: post_or_push: handle: start: <#{command}>", + :plugin => legacy_plugin.class) + legacy_plugin.handle(command, body, *arguments) + $log.trace("#{log_tag}: post_or_push: handle: done: <#{command}>", + :plugin => legacy_plugin.class) + elsif****@handl*****?(command) if synchronous.nil? - synchronous = handler.prefer_synchronous?(command) + synchronous =****@handl*****_synchronous?(command) end if route || @pool_size.zero? || synchronous - $log.trace("#{log_tag}: post_or_push: handle: start") - handler.handle(command, body, *arguments) - $log.trace("#{log_tag}: post_or_push: handle: done") + @handler.handle(@message) else unless message envelope["body"] = body @@ -213,6 +218,7 @@ module Droonga end def parse_message(message) + @message = message # TODO: remove me tag, time, record = message prefix, type, *arguments = tag.split(/\./) if type.nil? || type.empty? || type == 'message' @@ -228,28 +234,18 @@ module Droonga [envelope["body"], envelope["type"], envelope["arguments"]] end - def load_handlers - @handler_names.each do |handler_name| - loader = Droonga::PluginLoader.new("handler", handler_name) - loader.load - end - end - def prepare if @database_name && !@database_name.empty? @context = Groonga::Context.new @database =****@conte*****_database(@database_name) @job_queue = JobQueue.open(@database_name, @queue_name) end - @handler_names.each do |handler_name| - add_handler(handler_name) - end - add_handler("dispatcher_message") unless @options[:standalone] + add_legacy_plugin("dispatcher_message") unless @options[:standalone] end - def find_handler(command) - @handlers.find do |handler| - handler.handlable?(command) + def find_legacy_plugin(command) + @legacy_plugins.find do |legacy_plugin| + legacy_plugin.handlable?(command) end end Copied: lib/droonga/handler.rb (+109 -54) 68% =================================================================== --- lib/droonga/executor.rb 2013-11-24 12:04:26 +0900 (0e7d53f) +++ lib/droonga/handler.rb 2013-11-24 13:23:23 +0900 (35d83f9) @@ -22,30 +22,28 @@ require "groonga" require "droonga/job_queue" require "droonga/handler_plugin" require "droonga/plugin_loader" -require "droonga/dispatcher" module Droonga - class Executor + class Handler attr_reader :context, :envelope, :name def initialize(options={}) - @handlers = [] + @plugins = [] @outputs = {} @options = options @name = options[:name] @database_name = options[:database] @queue_name = options[:queue_name] || "DroongaQueue" - @handler_names = options[:handlers] || [] - @pool_size = options[:n_workers] || 0 -# load_handlers + @plugin_names = options[:handlers] || [] +# load_plugins Droonga::PluginLoader.load_all prepare end def shutdown $log.trace("#{log_tag}: shutdown: start") - @handlers.each do |handler| - handler.shutdown + @plugins.each do |plugin| + plugins.shutdown end @outputs.each do |dest, output| output[:logger].close if output[:logger] @@ -62,31 +60,9 @@ module Droonga $log.trace("#{log_tag}: shutdown: done") end - def add_handler(name) - handler = LegacyPlugin.repository.instantiate(name, self) - @handlers << handler - end - - def add_route(route) - envelope["via"].push(route) - end - - def dispatch(tag, time, record, synchronous=nil) - $log.trace("#{log_tag}: dispatch: start") - message = [tag, time, record] - body, type, arguments = parse_message([tag, time, record]) - reply_to = envelope["replyTo"] - if reply_to.is_a? String - envelope["replyTo"] = { - "type" => type + ".result", - "to" => reply_to - } - end - post_or_push(message, body, - "type" => type, - "arguments" => arguments, - "synchronous" => synchronous) - $log.trace("#{log_tag}: dispatch: done") + def add_plugin(name) + plugin = HandlerPlugin.repository.instantiate(name, self) + @plugins << plugin end def execute_one @@ -96,16 +72,55 @@ module Droonga $log.trace("#{log_tag}: execute_one: abort: no message") return end + handle(message) + $log.trace("#{log_tag}: execute_one: done") + end + + def handlable?(command) + not find_plugin(command).nil? + end + + def prefer_synchronous?(command) + find_plugin(command).prefer_synchronous?(command) + end + + def handle(message) + $log.trace("#{log_tag}: handle: start") body, command, arguments = parse_message(message) - handler = find_handler(command) - if handler - $log.trace("#{log_tag}: execute_one: handle: start", - :hander => handler.class) - handler.handle(command, body, *arguments) - $log.trace("#{log_tag}: execute_one: handle: done", - :hander => handler.class) + plugin = find_plugin(command) + if plugin.nil? + $log.trace("#{log_tag}: handle: done: no plugin: <#{command}>") + return end - $log.trace("#{log_tag}: execute_one: done") + + unless try_handle_as_internal_message(plugin, command, body, arguments) + @task = {} + @output_values = {} + $log.trace("#{log_tag}: handle: plugin: handle: start", + :hander => plugin.class) + plugin.handle(command, body, *arguments) + $log.trace("#{log_tag}: handle: plugin: handle: done", + :hander => plugin.class) + unless @output_values.empty? + $log.trace("#{log_tag}: handle: output: start") + post(@output_values) + $log.trace("#{log_tag}: handle: output: done") + end + end + $log.trace("#{log_tag}: handle: done: <#{command}>", + :plugin => plugin.class) + end + + def emit(value, name = nil) + unless name + if @output_names + name = @output_names.first + else + @output_values = @task["values"] = value + return + end + end + @output_values[name] = value end def post(body, destination=nil) @@ -140,14 +155,14 @@ module Droonga if receiver output(receiver, body, command, arguments) else - handler = find_handler(command) - if handler + plugin = find_plugin(command) + if plugin if synchronous.nil? - synchronous = handler.prefer_synchronous?(command) + synchronous = plugin.prefer_synchronous?(command) end if route || @pool_size.zero? || synchronous $log.trace("#{log_tag}: post_or_push: handle: start") - handler.handle(command, body, *arguments) + plugin.handle(command, body, *arguments) $log.trace("#{log_tag}: post_or_push: handle: done") else unless message @@ -228,9 +243,9 @@ module Droonga [envelope["body"], envelope["type"], envelope["arguments"]] end - def load_handlers - @handler_names.each do |handler_name| - loader = Droonga::PluginLoader.new("handler", handler_name) + def load_plugins + @plugin_names.each do |plugin_name| + loader = Droonga::PluginLoader.new("handler", plugin_name) loader.load end end @@ -241,15 +256,14 @@ module Droonga @database =****@conte*****_database(@database_name) @job_queue = JobQueue.open(@database_name, @queue_name) end - @handler_names.each do |handler_name| - add_handler(handler_name) + @plugin_names.each do |plugin_name| + add_plugin(plugin_name) end - add_handler("dispatcher_message") unless @options[:standalone] end - def find_handler(command) - @handlers.find do |handler| - handler.handlable?(command) + def find_plugin(command) + @plugins.find do |plugin| + plugin.handlable?(command) end end @@ -280,6 +294,47 @@ module Droonga output[:logger] end + # TODO: move to dispatcher + def try_handle_as_internal_message(plugin, command, request, arguments) + return false unless request.is_a? Hash + + @task = request["task"] + return false unles****@task*****_a? Hash + + @component = @task["component"] + return false unles****@compo*****_a? Hash + + @output_values = @task["values"] + @body = @component["body"] + @output_names = @component["outputs"] + @id = request["id"] + @value = request["value"] + @input_name = request["name"] + @descendants = request["descendants"] + + plugin.handle(command, @body, *arguments) + output_xxx if @descendants + true + end + + # TODO: move to dispatcher + def output_xxx + result = @task["values"] + post(result, @component["post"]) if @component["post"] + @descendants.each do |name, dests| + message = { + "id" => @id, + "input" => name, + "value" => result[name] + } + dests.each do |routes| + routes.each do |route| + post(message, "to"=>route, "type"=>"dispatcher") + end + end + end + end + def create_logger(options) Fluent::Logger::FluentLogger.new(nil, options) end Modified: lib/droonga/handler_plugin.rb (+54 -0) =================================================================== --- lib/droonga/handler_plugin.rb 2013-11-24 12:04:26 +0900 (a1571ed) +++ lib/droonga/handler_plugin.rb 2013-11-24 13:23:23 +0900 (b103748) @@ -22,9 +22,63 @@ module Droonga @@repository = PluginRepository.new class << self + def inherited(sub_class) + super + sub_class.instance_variable_set(:@command_mapper, CommandMapper.new) + end + def repository @@repository end + + def command(name_or_map) + @command_mapper.register(name_or_map) + end + + def method_name(command) + @command_mapper[command] + end + + def handlable?(command) + not method_name(command).nil? + end + end + + def initialize(handler) + @handler = handler + @context =****@handl***** + end + + def envelope + @handler.envelope + end + + def shutdown + end + + def handlable?(command) + self.class.handlable?(command) + end + + def handle(command, request, *arguments) + __send__(self.class.method_name(command), request, *arguments) + rescue => exception + Logger.error("error while handling #{command}", + request: request, + arguments: arguments, + exception: exception) + end + + def emit(value, name=nil) + @handler.emit(value, name) + end + + def post(body, destination=nil) + @handler.post(body, destination) + end + + def prefer_synchronous?(command) + false end end end Modified: lib/droonga/plugin/handler/add.rb (+3 -3) =================================================================== --- lib/droonga/plugin/handler/add.rb 2013-11-24 12:04:26 +0900 (672074b) +++ lib/droonga/plugin/handler/add.rb 2013-11-24 13:23:23 +0900 (296f7dc) @@ -17,11 +17,11 @@ require "groonga" -require "droonga/legacy_plugin" +require "droonga/handler_plugin" module Droonga - class AddHandler < Droonga::LegacyPlugin - Droonga::LegacyPlugin.repository.register("add", self) + class AddHandler < Droonga::HandlerPlugin + repository.register("add", self) command :add def add(request) Modified: lib/droonga/plugin/handler/groonga.rb (+3 -3) =================================================================== --- lib/droonga/plugin/handler/groonga.rb 2013-11-24 12:04:26 +0900 (d48efde) +++ lib/droonga/plugin/handler/groonga.rb 2013-11-24 13:23:23 +0900 (a8d821f) @@ -17,11 +17,11 @@ require "groonga" -require "droonga/legacy_plugin" +require "droonga/handler_plugin" module Droonga - class GroongaHandler < Droonga::LegacyPlugin - Droonga::LegacyPlugin.repository.register("groonga", self) + class GroongaHandler < Droonga::HandlerPlugin + repository.register("groonga", self) command :table_create def table_create(request) Modified: lib/droonga/plugin/handler/search.rb (+3 -3) =================================================================== --- lib/droonga/plugin/handler/search.rb 2013-11-24 12:04:26 +0900 (210a58a) +++ lib/droonga/plugin/handler/search.rb 2013-11-24 13:23:23 +0900 (22b465b) @@ -15,12 +15,12 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/legacy_plugin" +require "droonga/handler_plugin" require "droonga/searcher" module Droonga - class SearchHandler < Droonga::LegacyPlugin - Droonga::LegacyPlugin.repository.register("search", self) + class SearchHandler < Droonga::HandlerPlugin + repository.register("search", self) command :search def search(request) Modified: lib/droonga/plugin/handler/watch.rb (+3 -3) =================================================================== --- lib/droonga/plugin/handler/watch.rb 2013-11-24 12:04:26 +0900 (8afc91a) +++ lib/droonga/plugin/handler/watch.rb 2013-11-24 13:23:23 +0900 (c42b20f) @@ -18,11 +18,11 @@ require "droonga/watcher" require "droonga/sweeper" require "droonga/watch_schema" -require "droonga/legacy_plugin" +require "droonga/handler_plugin" module Droonga - class WatchHandler < Droonga::LegacyPlugin - Droonga::LegacyPlugin.repository.register("watch", self) + class WatchHandler < Droonga::HandlerPlugin + repository.register("watch", self) def initialize(*args) super Modified: lib/droonga/plugin_repository.rb (+11 -1) =================================================================== --- lib/droonga/plugin_repository.rb 2013-11-24 12:04:26 +0900 (816efc6) +++ lib/droonga/plugin_repository.rb 2013-11-24 13:23:23 +0900 (3047e0d) @@ -40,7 +40,17 @@ module Droonga end def instantiate(name, *args, &block) - self[name].new(*args, &block) + plugin_class = self[name] + if plugin_class.nil? + # TODO: use the original error + raise ArgumentError, "unknown plugin: <#{name}>" + end + begin + plugin_class.new(*args, &block) + rescue + p [plugin_class, plugin_class.method(:new), plugin_class.method(:new).arity, args.size] + raise + end end end end Modified: lib/droonga/worker.rb (+4 -4) =================================================================== --- lib/droonga/worker.rb 2013-11-24 12:04:26 +0900 (c2261b1) +++ lib/droonga/worker.rb 2013-11-24 13:23:23 +0900 (888b6e7) @@ -15,14 +15,14 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/executor" +require "droonga/handler" module Droonga module Worker attr_reader :context, :envelope, :name def initialize - @executor = Executor.new(config.merge(:standalone => true)) + @handler = Handler.new(config) end def run @@ -30,10 +30,10 @@ module Droonga @running = true while @running $log.trace("#{log_tag}: run: pull_message: start") - @executor.execute_one + @handler.execute_one $log.trace("#{log_tag}: run: pull_message: done") end - @executor.shutdown + @handler.shutdown $log.trace("#{log_tag}: run: done") end -------------- next part -------------- HTML����������������������������... Download