[Groonga-commit] droonga/fluent-plugin-droonga at f5b9a0f [master] Extract Handler from Executor

Back to archive index

Kouhei Sutou null+****@clear*****
Sun Nov 24 13:23:23 JST 2013


Kouhei Sutou	2013-11-24 13:23:23 +0900 (Sun, 24 Nov 2013)

  New Revision: f5b9a0f78b35563e240614b2b5396dad83fd7152
  https://github.com/droonga/fluent-plugin-droonga/commit/f5b9a0f78b35563e240614b2b5396dad83fd7152

  Message:
    Extract Handler from Executor
    
    There are many duplicated codes...

  Copied files:
    lib/droonga/handler.rb
      (from lib/droonga/executor.rb)
  Modified files:
    lib/droonga/dispatcher.rb
    lib/droonga/executor.rb
    lib/droonga/handler_plugin.rb
    lib/droonga/plugin/handler/add.rb
    lib/droonga/plugin/handler/groonga.rb
    lib/droonga/plugin/handler/search.rb
    lib/droonga/plugin/handler/watch.rb
    lib/droonga/plugin_repository.rb
    lib/droonga/worker.rb

  Modified: lib/droonga/dispatcher.rb (+1 -1)
===================================================================
--- lib/droonga/dispatcher.rb    2013-11-24 12:04:26 +0900 (431c351)
+++ lib/droonga/dispatcher.rb    2013-11-24 13:23:23 +0900 (284e45a)
@@ -35,7 +35,7 @@ module Droonga
       @local = Regexp.new("^#{@name}")
       plugins = ["collector"] + (Droonga.catalog.option("plugins")||[]) + ["adapter"]
       plugins.each do |plugin|
-        @worker.add_handler(plugin)
+        @worker.add_legacy_plugin(plugin)
       end
     end
 

  Modified: lib/droonga/executor.rb (+28 -32)
===================================================================
--- lib/droonga/executor.rb    2013-11-24 12:04:26 +0900 (0e7d53f)
+++ lib/droonga/executor.rb    2013-11-24 13:23:23 +0900 (88edeef)
@@ -29,24 +29,25 @@ module Droonga
     attr_reader :context, :envelope, :name
 
     def initialize(options={})
-      @handlers = []
+      @legacy_plugins = []
       @outputs = {}
       @options = options
       @name = options[:name]
       @database_name = options[:database]
       @queue_name = options[:queue_name] || "DroongaQueue"
-      @handler_names = options[:handlers] || []
       @pool_size = options[:n_workers] || 0
 #     load_handlers
       Droonga::PluginLoader.load_all
+      @handler = Handler.new(@options)
       prepare
     end
 
     def shutdown
       $log.trace("#{log_tag}: shutdown: start")
-      @handlers.each do |handler|
-        handler.shutdown
+      @legacy_plugins.each do |legacy_plugin|
+        legacy_plugin.shutdown
       end
+      @handler.shutdown
       @outputs.each do |dest, output|
         output[:logger].close if output[:logger]
       end
@@ -62,9 +63,9 @@ module Droonga
       $log.trace("#{log_tag}: shutdown: done")
     end
 
-    def add_handler(name)
-      handler = LegacyPlugin.repository.instantiate(name, self)
-      @handlers << handler
+    def add_legacy_plugin(name)
+      legacy_plugin = LegacyPlugin.repository.instantiate(name, self)
+      @legacy_plugins << legacy_plugin
     end
 
     def add_route(route)
@@ -97,13 +98,13 @@ module Droonga
         return
       end
       body, command, arguments = parse_message(message)
-      handler = find_handler(command)
-      if handler
+      legacy_plugin = find_legacy_plugin(command)
+      if legacy_plugin
         $log.trace("#{log_tag}: execute_one: handle: start",
-                   :hander => handler.class)
-        handler.handle(command, body, *arguments)
+                   :hander => legacy_plugin.class)
+        legacy_plugin.handle(command, body, *arguments)
         $log.trace("#{log_tag}: execute_one: handle: done",
-                   :hander => handler.class)
+                   :hander => legacy_plugin.class)
       end
       $log.trace("#{log_tag}: execute_one: done")
     end
@@ -140,15 +141,19 @@ module Droonga
       if receiver
         output(receiver, body, command, arguments)
       else
-        handler = find_handler(command)
-        if handler
+        legacy_plugin = find_legacy_plugin(command)
+        if legacy_plugin
+          $log.trace("#{log_tag}: post_or_push: handle: start: <#{command}>",
+                     :plugin => legacy_plugin.class)
+          legacy_plugin.handle(command, body, *arguments)
+          $log.trace("#{log_tag}: post_or_push: handle: done: <#{command}>",
+                     :plugin => legacy_plugin.class)
+        elsif****@handl*****?(command)
           if synchronous.nil?
-            synchronous = handler.prefer_synchronous?(command)
+            synchronous =****@handl*****_synchronous?(command)
           end
           if route || @pool_size.zero? || synchronous
-            $log.trace("#{log_tag}: post_or_push: handle: start")
-            handler.handle(command, body, *arguments)
-            $log.trace("#{log_tag}: post_or_push: handle: done")
+            @handler.handle(@message)
           else
             unless message
               envelope["body"] = body
@@ -213,6 +218,7 @@ module Droonga
     end
 
     def parse_message(message)
+      @message = message # TODO: remove me
       tag, time, record = message
       prefix, type, *arguments = tag.split(/\./)
       if type.nil? || type.empty? || type == 'message'
@@ -228,28 +234,18 @@ module Droonga
       [envelope["body"], envelope["type"], envelope["arguments"]]
     end
 
-    def load_handlers
-      @handler_names.each do |handler_name|
-        loader = Droonga::PluginLoader.new("handler", handler_name)
-        loader.load
-      end
-    end
-
     def prepare
       if @database_name && !@database_name.empty?
         @context = Groonga::Context.new
         @database =****@conte*****_database(@database_name)
         @job_queue = JobQueue.open(@database_name, @queue_name)
       end
-      @handler_names.each do |handler_name|
-        add_handler(handler_name)
-      end
-      add_handler("dispatcher_message") unless @options[:standalone]
+      add_legacy_plugin("dispatcher_message") unless @options[:standalone]
     end
 
-    def find_handler(command)
-      @handlers.find do |handler|
-        handler.handlable?(command)
+    def find_legacy_plugin(command)
+      @legacy_plugins.find do |legacy_plugin|
+        legacy_plugin.handlable?(command)
       end
     end
 

  Copied: lib/droonga/handler.rb (+109 -54) 68%
===================================================================
--- lib/droonga/executor.rb    2013-11-24 12:04:26 +0900 (0e7d53f)
+++ lib/droonga/handler.rb    2013-11-24 13:23:23 +0900 (35d83f9)
@@ -22,30 +22,28 @@ require "groonga"
 require "droonga/job_queue"
 require "droonga/handler_plugin"
 require "droonga/plugin_loader"
-require "droonga/dispatcher"
 
 module Droonga
-  class Executor
+  class Handler
     attr_reader :context, :envelope, :name
 
     def initialize(options={})
-      @handlers = []
+      @plugins = []
       @outputs = {}
       @options = options
       @name = options[:name]
       @database_name = options[:database]
       @queue_name = options[:queue_name] || "DroongaQueue"
-      @handler_names = options[:handlers] || []
-      @pool_size = options[:n_workers] || 0
-#     load_handlers
+      @plugin_names = options[:handlers] || []
+#     load_plugins
       Droonga::PluginLoader.load_all
       prepare
     end
 
     def shutdown
       $log.trace("#{log_tag}: shutdown: start")
-      @handlers.each do |handler|
-        handler.shutdown
+      @plugins.each do |plugin|
+        plugins.shutdown
       end
       @outputs.each do |dest, output|
         output[:logger].close if output[:logger]
@@ -62,31 +60,9 @@ module Droonga
       $log.trace("#{log_tag}: shutdown: done")
     end
 
-    def add_handler(name)
-      handler = LegacyPlugin.repository.instantiate(name, self)
-      @handlers << handler
-    end
-
-    def add_route(route)
-      envelope["via"].push(route)
-    end
-
-    def dispatch(tag, time, record, synchronous=nil)
-      $log.trace("#{log_tag}: dispatch: start")
-      message = [tag, time, record]
-      body, type, arguments = parse_message([tag, time, record])
-      reply_to = envelope["replyTo"]
-      if reply_to.is_a? String
-        envelope["replyTo"] = {
-          "type" => type + ".result",
-          "to" => reply_to
-        }
-      end
-      post_or_push(message, body,
-                   "type" => type,
-                   "arguments" => arguments,
-                   "synchronous" => synchronous)
-      $log.trace("#{log_tag}: dispatch: done")
+    def add_plugin(name)
+      plugin = HandlerPlugin.repository.instantiate(name, self)
+      @plugins << plugin
     end
 
     def execute_one
@@ -96,16 +72,55 @@ module Droonga
         $log.trace("#{log_tag}: execute_one: abort: no message")
         return
       end
+      handle(message)
+      $log.trace("#{log_tag}: execute_one: done")
+    end
+
+    def handlable?(command)
+      not find_plugin(command).nil?
+    end
+
+    def prefer_synchronous?(command)
+      find_plugin(command).prefer_synchronous?(command)
+    end
+
+    def handle(message)
+      $log.trace("#{log_tag}: handle: start")
       body, command, arguments = parse_message(message)
-      handler = find_handler(command)
-      if handler
-        $log.trace("#{log_tag}: execute_one: handle: start",
-                   :hander => handler.class)
-        handler.handle(command, body, *arguments)
-        $log.trace("#{log_tag}: execute_one: handle: done",
-                   :hander => handler.class)
+      plugin = find_plugin(command)
+      if plugin.nil?
+        $log.trace("#{log_tag}: handle: done: no plugin: <#{command}>")
+        return
       end
-      $log.trace("#{log_tag}: execute_one: done")
+
+      unless try_handle_as_internal_message(plugin, command, body, arguments)
+        @task = {}
+        @output_values = {}
+        $log.trace("#{log_tag}: handle: plugin: handle: start",
+                   :hander => plugin.class)
+        plugin.handle(command, body, *arguments)
+        $log.trace("#{log_tag}: handle: plugin: handle: done",
+                   :hander => plugin.class)
+        unless @output_values.empty?
+          $log.trace("#{log_tag}: handle: output: start")
+          post(@output_values)
+          $log.trace("#{log_tag}: handle: output: done")
+        end
+      end
+      $log.trace("#{log_tag}: handle: done: <#{command}>",
+                 :plugin => plugin.class)
+    end
+
+    def emit(value, name = nil)
+      unless name
+        if @output_names
+          name = @output_names.first
+        else
+          @output_values = @task["values"] = value
+          return
+        end
+      end
+      @output_values[name] = value
     end
 
     def post(body, destination=nil)
@@ -140,14 +155,14 @@ module Droonga
       if receiver
         output(receiver, body, command, arguments)
       else
-        handler = find_handler(command)
-        if handler
+        plugin = find_plugin(command)
+        if plugin
           if synchronous.nil?
-            synchronous = handler.prefer_synchronous?(command)
+            synchronous = plugin.prefer_synchronous?(command)
           end
           if route || @pool_size.zero? || synchronous
             $log.trace("#{log_tag}: post_or_push: handle: start")
-            handler.handle(command, body, *arguments)
+            plugin.handle(command, body, *arguments)
             $log.trace("#{log_tag}: post_or_push: handle: done")
           else
             unless message
@@ -228,9 +243,9 @@ module Droonga
       [envelope["body"], envelope["type"], envelope["arguments"]]
     end
 
-    def load_handlers
-      @handler_names.each do |handler_name|
-        loader = Droonga::PluginLoader.new("handler", handler_name)
+    def load_plugins
+      @plugin_names.each do |plugin_name|
+        loader = Droonga::PluginLoader.new("handler", plugin_name)
         loader.load
       end
     end
@@ -241,15 +256,14 @@ module Droonga
         @database =****@conte*****_database(@database_name)
         @job_queue = JobQueue.open(@database_name, @queue_name)
       end
-      @handler_names.each do |handler_name|
-        add_handler(handler_name)
+      @plugin_names.each do |plugin_name|
+        add_plugin(plugin_name)
       end
-      add_handler("dispatcher_message") unless @options[:standalone]
     end
 
-    def find_handler(command)
-      @handlers.find do |handler|
-        handler.handlable?(command)
+    def find_plugin(command)
+      @plugins.find do |plugin|
+        plugin.handlable?(command)
       end
     end
 
@@ -280,6 +294,47 @@ module Droonga
       output[:logger]
     end
 
+    # TODO: move to dispatcher
+    def try_handle_as_internal_message(plugin, command, request, arguments)
+      return false unless request.is_a? Hash
+
+      @task = request["task"]
+      return false unles****@task*****_a? Hash
+
+      @component = @task["component"]
+      return false unles****@compo*****_a? Hash
+
+      @output_values = @task["values"]
+      @body = @component["body"]
+      @output_names = @component["outputs"]
+      @id = request["id"]
+      @value = request["value"]
+      @input_name = request["name"]
+      @descendants = request["descendants"]
+
+      plugin.handle(command, @body, *arguments)
+      output_xxx if @descendants
+      true
+    end
+
+    # TODO: move to dispatcher
+    def output_xxx
+      result = @task["values"]
+      post(result, @component["post"]) if @component["post"]
+      @descendants.each do |name, dests|
+        message = {
+          "id" => @id,
+          "input" => name,
+          "value" => result[name]
+        }
+        dests.each do |routes|
+          routes.each do |route|
+            post(message, "to"=>route, "type"=>"dispatcher")
+          end
+        end
+      end
+    end
+
     def create_logger(options)
       Fluent::Logger::FluentLogger.new(nil, options)
     end

  Modified: lib/droonga/handler_plugin.rb (+54 -0)
===================================================================
--- lib/droonga/handler_plugin.rb    2013-11-24 12:04:26 +0900 (a1571ed)
+++ lib/droonga/handler_plugin.rb    2013-11-24 13:23:23 +0900 (b103748)
@@ -22,9 +22,63 @@ module Droonga
     @@repository = PluginRepository.new
 
     class << self
+      def inherited(sub_class)
+        super
+        sub_class.instance_variable_set(:@command_mapper, CommandMapper.new)
+      end
+
       def repository
         @@repository
       end
+
+      def command(name_or_map)
+        @command_mapper.register(name_or_map)
+      end
+
+      def method_name(command)
+        @command_mapper[command]
+      end
+
+      def handlable?(command)
+        not method_name(command).nil?
+      end
+    end
+
+    def initialize(handler)
+      @handler = handler
+      @context =****@handl*****
+    end
+
+    def envelope
+      @handler.envelope
+    end
+
+    def shutdown
+    end
+
+    def handlable?(command)
+      self.class.handlable?(command)
+    end
+
+    def handle(command, request, *arguments)
+      __send__(self.class.method_name(command), request, *arguments)
+    rescue => exception
+      Logger.error("error while handling #{command}",
+                   request: request,
+                   arguments: arguments,
+                   exception: exception)
+    end
+
+    def emit(value, name=nil)
+      @handler.emit(value, name)
+    end
+
+    def post(body, destination=nil)
+      @handler.post(body, destination)
+    end
+
+    def prefer_synchronous?(command)
+      false
     end
   end
 end

  Modified: lib/droonga/plugin/handler/add.rb (+3 -3)
===================================================================
--- lib/droonga/plugin/handler/add.rb    2013-11-24 12:04:26 +0900 (672074b)
+++ lib/droonga/plugin/handler/add.rb    2013-11-24 13:23:23 +0900 (296f7dc)
@@ -17,11 +17,11 @@
 
 require "groonga"
 
-require "droonga/legacy_plugin"
+require "droonga/handler_plugin"
 
 module Droonga
-  class AddHandler < Droonga::LegacyPlugin
-    Droonga::LegacyPlugin.repository.register("add", self)
+  class AddHandler < Droonga::HandlerPlugin
+    repository.register("add", self)
 
     command :add
     def add(request)

  Modified: lib/droonga/plugin/handler/groonga.rb (+3 -3)
===================================================================
--- lib/droonga/plugin/handler/groonga.rb    2013-11-24 12:04:26 +0900 (d48efde)
+++ lib/droonga/plugin/handler/groonga.rb    2013-11-24 13:23:23 +0900 (a8d821f)
@@ -17,11 +17,11 @@
 
 require "groonga"
 
-require "droonga/legacy_plugin"
+require "droonga/handler_plugin"
 
 module Droonga
-  class GroongaHandler < Droonga::LegacyPlugin
-    Droonga::LegacyPlugin.repository.register("groonga", self)
+  class GroongaHandler < Droonga::HandlerPlugin
+    repository.register("groonga", self)
 
     command :table_create
     def table_create(request)

  Modified: lib/droonga/plugin/handler/search.rb (+3 -3)
===================================================================
--- lib/droonga/plugin/handler/search.rb    2013-11-24 12:04:26 +0900 (210a58a)
+++ lib/droonga/plugin/handler/search.rb    2013-11-24 13:23:23 +0900 (22b465b)
@@ -15,12 +15,12 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/legacy_plugin"
+require "droonga/handler_plugin"
 require "droonga/searcher"
 
 module Droonga
-  class SearchHandler < Droonga::LegacyPlugin
-    Droonga::LegacyPlugin.repository.register("search", self)
+  class SearchHandler < Droonga::HandlerPlugin
+    repository.register("search", self)
 
     command :search
     def search(request)

  Modified: lib/droonga/plugin/handler/watch.rb (+3 -3)
===================================================================
--- lib/droonga/plugin/handler/watch.rb    2013-11-24 12:04:26 +0900 (8afc91a)
+++ lib/droonga/plugin/handler/watch.rb    2013-11-24 13:23:23 +0900 (c42b20f)
@@ -18,11 +18,11 @@
 require "droonga/watcher"
 require "droonga/sweeper"
 require "droonga/watch_schema"
-require "droonga/legacy_plugin"
+require "droonga/handler_plugin"
 
 module Droonga
-  class WatchHandler < Droonga::LegacyPlugin
-    Droonga::LegacyPlugin.repository.register("watch", self)
+  class WatchHandler < Droonga::HandlerPlugin
+    repository.register("watch", self)
 
     def initialize(*args)
       super

  Modified: lib/droonga/plugin_repository.rb (+11 -1)
===================================================================
--- lib/droonga/plugin_repository.rb    2013-11-24 12:04:26 +0900 (816efc6)
+++ lib/droonga/plugin_repository.rb    2013-11-24 13:23:23 +0900 (3047e0d)
@@ -40,7 +40,17 @@ module Droonga
     end
 
     def instantiate(name, *args, &block)
-      self[name].new(*args, &block)
+      plugin_class = self[name]
+      if plugin_class.nil?
+        # TODO: use the original error
+        raise ArgumentError, "unknown plugin: <#{name}>"
+      end
+      begin
+        plugin_class.new(*args, &block)
+      rescue
+        p [plugin_class, plugin_class.method(:new), plugin_class.method(:new).arity, args.size]
+        raise
+      end
     end
   end
 end

  Modified: lib/droonga/worker.rb (+4 -4)
===================================================================
--- lib/droonga/worker.rb    2013-11-24 12:04:26 +0900 (c2261b1)
+++ lib/droonga/worker.rb    2013-11-24 13:23:23 +0900 (888b6e7)
@@ -15,14 +15,14 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/executor"
+require "droonga/handler"
 
 module Droonga
   module Worker
     attr_reader :context, :envelope, :name
 
     def initialize
-      @executor = Executor.new(config.merge(:standalone => true))
+      @handler = Handler.new(config)
     end
 
     def run
@@ -30,10 +30,10 @@ module Droonga
       @running = true
       while @running
         $log.trace("#{log_tag}: run: pull_message: start")
-        @executor.execute_one
+        @handler.execute_one
         $log.trace("#{log_tag}: run: pull_message: done")
       end
-      @executor.shutdown
+      @handler.shutdown
       $log.trace("#{log_tag}: run: done")
     end
 
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index