[Groonga-commit] droonga/fluent-plugin-droonga at 017b579 [master] Remove executor

Back to archive index

Kouhei Sutou null+****@clear*****
Mon Nov 25 00:58:49 JST 2013


Kouhei Sutou	2013-11-25 00:58:49 +0900 (Mon, 25 Nov 2013)

  New Revision: 017b579ae5d342bdf106cb468791ef6b772f48f0
  https://github.com/droonga/fluent-plugin-droonga/commit/017b579ae5d342bdf106cb468791ef6b772f48f0

  Message:
    Remove executor
    
    All features are moved to engine and dispatcher.

  Removed files:
    lib/droonga/executor.rb
  Modified files:
    lib/droonga/adapter.rb
    lib/droonga/adapter_plugin.rb
    lib/droonga/dispatcher.rb
    lib/droonga/engine.rb

  Modified: lib/droonga/adapter.rb (+3 -3)
===================================================================
--- lib/droonga/adapter.rb    2013-11-25 00:57:13 +0900 (74819c7)
+++ lib/droonga/adapter.rb    2013-11-25 00:58:49 +0900 (a48505e)
@@ -22,14 +22,14 @@ module Droonga
   class Adapter
     include Pluggable
 
-    def initialize(executor, options={})
-      @executor = executor
+    def initialize(dispatcher, options={})
+      @dispatcher = dispatcher
       load_plugins(options[:adapters] || [])
     end
 
     private
     def instantiate_plugin(name)
-      AdapterPlugin.repository.instantiate(name, @executor)
+      AdapterPlugin.repository.instantiate(name, @dispatcher)
     end
 
     def log_tag

  Modified: lib/droonga/adapter_plugin.rb (+4 -4)
===================================================================
--- lib/droonga/adapter_plugin.rb    2013-11-25 00:57:13 +0900 (e08f1d6)
+++ lib/droonga/adapter_plugin.rb    2013-11-25 00:58:49 +0900 (4b5e6c0)
@@ -21,17 +21,17 @@ module Droonga
   class AdapterPlugin < Plugin
     extend PluginRegisterable
 
-    def initialize(executor)
+    def initialize(dispatcher)
       super()
-      @executor = executor
+      @dispatcher = dispatcher
     end
 
     def add_route(route)
-      @executor.add_route(route)
+      @dispatcher.add_route(route)
     end
 
     def post(body, destination=nil)
-      @executor.post(body, destination)
+      @dispatcher.post(body, destination)
     end
 
     def emit(value, name=nil)

  Modified: lib/droonga/dispatcher.rb (+70 -15)
===================================================================
--- lib/droonga/dispatcher.rb    2013-11-25 00:57:13 +0900 (70040dc)
+++ lib/droonga/dispatcher.rb    2013-11-25 00:58:49 +0900 (1dea0f7)
@@ -17,36 +17,91 @@
 
 require 'tsort'
 require "droonga/adapter"
+require "droonga/distributor"
 require "droonga/catalog"
 require "droonga/collector"
 require "droonga/farm"
 
 module Droonga
   class Dispatcher
-    attr_reader :collectors
-    def initialize(worker, name)
+    attr_reader :name, :envelope, :collectors
+
+    def initialize(options)
+      @options = options
+      @name = @options[:name]
       @farm = Farm.new(name)
-      @farm.start
-      @worker = worker
-      @name = name
       @collectors = {}
       @current_id = 0
       @local = Regexp.new("^#{@name}")
-      @adapter = Adapter.new(@worker,
+      @adapter = Adapter.new(self,
                              :adapters => Droonga.catalog.option("plugins"))
+      @forwarder = Forwarder.new
+      @distributor = Distributor.new(self, @options)
+    end
+
+    def start
+      @farm.start
     end
 
     def shutdown
+      @forwarder.shutdown
+      @distributor.shutdown
       @adapter.shutdown
       @farm.shutdown
     end
 
-    def processable?(command)
-      @adapter.processable?(command)
+    def add_route(route)
+      envelope["via"].push(route)
     end
 
-    def process(command, body, *arguments)
-      @adapter.process(command, body)
+    def handle_envelope(envelope)
+      @envelope = envelope
+      post(envelope["body"],
+           "type" => envelope["type"],
+           "arguments" => envelope["arguments"],
+           "synchronous" => envelope["synchronous"])
+    end
+
+    def post(body, destination=nil)
+      $log.trace("#{log_tag}: post: start")
+      route = nil
+      unless is_route?(destination)
+        route = envelope["via"].pop
+        destination = route
+      end
+      unless is_route?(destination)
+        destination = envelope["replyTo"]
+      end
+      command = nil
+      receiver = nil
+      arguments = nil
+      synchronous = nil
+      case destination
+      when String
+        command = destination
+      when Hash
+        command = destination["type"]
+        receiver = destination["to"]
+        arguments = destination["arguments"]
+        synchronous = destination["synchronous"]
+      end
+      if receiver
+        @forwarder.forward(envelope, body,
+                           "type" => command,
+                           "to" => receiver,
+                           "arguments" => arguments)
+      else
+        if command == "dispatcher"
+          handle(body, arguments)
+        elsif****@adapt*****?(command)
+          @adapter.process(command, body, *arguments)
+        else
+          @distributor.distribute(envelope.merge("type" => command,
+                                                 "body" => body))
+        end
+      end
+      add_route(route) if route
+      $log.trace("#{log_tag}: post: done")
     end
 
     def handle(message, arguments)
@@ -96,15 +151,11 @@ module Droonga
       if id == route
         post(message, "type" => type, "synchronous"=> synchronous)
       else
-        envelope =****@worke*****("body" => message, "type" => type)
+        envelope =****@envel*****("body" => message, "type" => type)
         @farm.process(route, envelope, synchronous)
       end
     end
 
-    def post(message, destination)
-      @worker.post(message, destination)
-    end
-
     def generate_id
       id = @current_id
       @current_id = id.succ
@@ -124,6 +175,10 @@ module Droonga
     end
 
     private
+    def is_route?(route)
+      route.is_a?(String) || route.is_a?(Hash)
+    end
+
     def log_tag
       "[#{Process.ppid}][#{Process.pid}] dispatcher"
     end

  Modified: lib/droonga/engine.rb (+29 -5)
===================================================================
--- lib/droonga/engine.rb    2013-11-25 00:57:13 +0900 (ab296fa)
+++ lib/droonga/engine.rb    2013-11-25 00:58:49 +0900 (1433c50)
@@ -16,27 +16,51 @@
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
 require "droonga/logger"
-require "droonga/executor"
+require "droonga/dispatcher"
 
 module Droonga
   class Engine
     def initialize(options={})
       @options = options
+      @dispatcher = Dispatcher.new(@options)
     end
 
     def start
-      @executor = Executor.new(@options)
+      @dispatcher.start
     end
 
     def shutdown
       $log.trace("engine: shutdown: start")
-      @executor.shutdown
+      @dispatcher.shutdown
       $log.trace("engine: shutdown: done")
     end
 
-    def emit(tag, time, record, synchronous=nil)
+    def emit(tag, time, record)
       $log.trace("[#{Process.pid}] tag: <#{tag}> caller: <#{caller.first}>")
-      @executor.dispatch(tag, time, record, synchronous)
+      @dispatcher.handle_envelope(parse_record(tag, record))
+    end
+
+    private
+    def parse_record(tag, record)
+      prefix, type, *arguments = tag.split(/\./)
+      if type.nil? || type.empty? || type == 'message'
+        envelope = record
+      else
+        envelope = {
+          "type" => type,
+          "arguments" => arguments,
+          "body" => record
+        }
+      end
+      envelope["via"] ||= []
+      reply_to = envelope["replyTo"]
+      if reply_to.is_a? String
+        envelope["replyTo"] = {
+          "type" => envelope["type"] + ".result",
+          "to" => reply_to
+        }
+      end
+      envelope
     end
   end
 end

  Deleted: lib/droonga/executor.rb (+0 -140) 100644
===================================================================
--- lib/droonga/executor.rb    2013-11-25 00:57:13 +0900 (4df788f)
+++ /dev/null
@@ -1,140 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013 Droonga Project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "groonga"
-
-require "droonga/forwarder"
-require "droonga/dispatcher"
-require "droonga/distributor"
-
-module Droonga
-  class Executor
-    attr_reader :context, :envelope, :name
-
-    def initialize(options={})
-      @options = options
-      @name = options[:name]
-      prepare
-    end
-
-    def shutdown
-      $log.trace("#{log_tag}: shutdown: start")
-      @distributor.shutdown
-      @forwarder.shutdown
-      $log.trace("#{log_tag}: shutdown: done")
-    end
-
-    def add_route(route)
-      envelope["via"].push(route)
-    end
-
-    def dispatch(tag, time, record, synchronous=nil)
-      $log.trace("#{log_tag}: dispatch: start")
-      message = [tag, time, record]
-      body, type, arguments = parse_message([tag, time, record])
-      reply_to = envelope["replyTo"]
-      if reply_to.is_a? String
-        envelope["replyTo"] = {
-          "type" => type + ".result",
-          "to" => reply_to
-        }
-      end
-      post_or_push(message, body,
-                   "type" => type,
-                   "arguments" => arguments,
-                   "synchronous" => synchronous)
-      $log.trace("#{log_tag}: dispatch: done")
-    end
-
-    def post(body, destination=nil)
-      $log.trace("#{log_tag}: post: start")
-      post_or_push(nil, body, destination)
-      $log.trace("#{log_tag}: post: done")
-    end
-
-    private
-    def post_or_push(message, body, destination)
-      route = nil
-      unless is_route?(destination)
-        route = envelope["via"].pop
-        destination = route
-      end
-      unless is_route?(destination)
-        destination = envelope["replyTo"]
-      end
-      command = nil
-      receiver = nil
-      arguments = nil
-      synchronous = nil
-      case destination
-      when String
-        command = destination
-      when Hash
-        command = destination["type"]
-        receiver = destination["to"]
-        arguments = destination["arguments"]
-        synchronous = destination["synchronous"]
-      end
-      if receiver
-        @forwarder.forward(envelope, body,
-                           "type" => command,
-                           "to" => receiver,
-                           "arguments" => arguments)
-      else
-        if command == "dispatcher"
-          @dispatcher.handle(body, arguments)
-        elsif****@dispa*****?(command)
-          @dispatcher.process(command, body, *arguments)
-        else
-          @distributor.distribute(envelope.merge("type" => command,
-                                                 "body" => body))
-        end
-      end
-      add_route(route) if route
-    end
-
-    def is_route?(route)
-      route.is_a?(String) || route.is_a?(Hash)
-    end
-
-    def parse_message(message)
-      tag, time, record = message
-      prefix, type, *arguments = tag.split(/\./)
-      if type.nil? || type.empty? || type == 'message'
-        @envelope = record
-      else
-        @envelope = {
-          "type" => type,
-          "arguments" => arguments,
-          "body" => record
-        }
-      end
-      envelope["via"] ||= []
-      [envelope["body"], envelope["type"], envelope["arguments"]]
-    end
-
-    def prepare
-      @dispatcher = Dispatcher.new(self, name)
-      @distributor = Distributor.new(@dispatcher, @options)
-      @forwarder = Forwarder.new
-    end
-
-    def log_tag
-      "[#{Process.ppid}][#{Process.pid}] executor"
-    end
-  end
-end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index