Kouhei Sutou
null+****@clear*****
Mon Nov 25 00:58:49 JST 2013
Kouhei Sutou 2013-11-25 00:58:49 +0900 (Mon, 25 Nov 2013) New Revision: 017b579ae5d342bdf106cb468791ef6b772f48f0 https://github.com/droonga/fluent-plugin-droonga/commit/017b579ae5d342bdf106cb468791ef6b772f48f0 Message: Remove executor All features are moved to engine and dispatcher. Removed files: lib/droonga/executor.rb Modified files: lib/droonga/adapter.rb lib/droonga/adapter_plugin.rb lib/droonga/dispatcher.rb lib/droonga/engine.rb Modified: lib/droonga/adapter.rb (+3 -3) =================================================================== --- lib/droonga/adapter.rb 2013-11-25 00:57:13 +0900 (74819c7) +++ lib/droonga/adapter.rb 2013-11-25 00:58:49 +0900 (a48505e) @@ -22,14 +22,14 @@ module Droonga class Adapter include Pluggable - def initialize(executor, options={}) - @executor = executor + def initialize(dispatcher, options={}) + @dispatcher = dispatcher load_plugins(options[:adapters] || []) end private def instantiate_plugin(name) - AdapterPlugin.repository.instantiate(name, @executor) + AdapterPlugin.repository.instantiate(name, @dispatcher) end def log_tag Modified: lib/droonga/adapter_plugin.rb (+4 -4) =================================================================== --- lib/droonga/adapter_plugin.rb 2013-11-25 00:57:13 +0900 (e08f1d6) +++ lib/droonga/adapter_plugin.rb 2013-11-25 00:58:49 +0900 (4b5e6c0) @@ -21,17 +21,17 @@ module Droonga class AdapterPlugin < Plugin extend PluginRegisterable - def initialize(executor) + def initialize(dispatcher) super() - @executor = executor + @dispatcher = dispatcher end def add_route(route) - @executor.add_route(route) + @dispatcher.add_route(route) end def post(body, destination=nil) - @executor.post(body, destination) + @dispatcher.post(body, destination) end def emit(value, name=nil) Modified: lib/droonga/dispatcher.rb (+70 -15) =================================================================== --- lib/droonga/dispatcher.rb 2013-11-25 00:57:13 +0900 (70040dc) +++ lib/droonga/dispatcher.rb 2013-11-25 00:58:49 +0900 (1dea0f7) @@ -17,36 +17,91 @@ require 'tsort' require "droonga/adapter" +require "droonga/distributor" require "droonga/catalog" require "droonga/collector" require "droonga/farm" module Droonga class Dispatcher - attr_reader :collectors - def initialize(worker, name) + attr_reader :name, :envelope, :collectors + + def initialize(options) + @options = options + @name = @options[:name] @farm = Farm.new(name) - @farm.start - @worker = worker - @name = name @collectors = {} @current_id = 0 @local = Regexp.new("^#{@name}") - @adapter = Adapter.new(@worker, + @adapter = Adapter.new(self, :adapters => Droonga.catalog.option("plugins")) + @forwarder = Forwarder.new + @distributor = Distributor.new(self, @options) + end + + def start + @farm.start end def shutdown + @forwarder.shutdown + @distributor.shutdown @adapter.shutdown @farm.shutdown end - def processable?(command) - @adapter.processable?(command) + def add_route(route) + envelope["via"].push(route) end - def process(command, body, *arguments) - @adapter.process(command, body) + def handle_envelope(envelope) + @envelope = envelope + post(envelope["body"], + "type" => envelope["type"], + "arguments" => envelope["arguments"], + "synchronous" => envelope["synchronous"]) + end + + def post(body, destination=nil) + $log.trace("#{log_tag}: post: start") + route = nil + unless is_route?(destination) + route = envelope["via"].pop + destination = route + end + unless is_route?(destination) + destination = envelope["replyTo"] + end + command = nil + receiver = nil + arguments = nil + synchronous = nil + case destination + when String + command = destination + when Hash + command = destination["type"] + receiver = destination["to"] + arguments = destination["arguments"] + synchronous = destination["synchronous"] + end + if receiver + @forwarder.forward(envelope, body, + "type" => command, + "to" => receiver, + "arguments" => arguments) + else + if command == "dispatcher" + handle(body, arguments) + elsif****@adapt*****?(command) + @adapter.process(command, body, *arguments) + else + @distributor.distribute(envelope.merge("type" => command, + "body" => body)) + end + end + add_route(route) if route + $log.trace("#{log_tag}: post: done") end def handle(message, arguments) @@ -96,15 +151,11 @@ module Droonga if id == route post(message, "type" => type, "synchronous"=> synchronous) else - envelope =****@worke*****("body" => message, "type" => type) + envelope =****@envel*****("body" => message, "type" => type) @farm.process(route, envelope, synchronous) end end - def post(message, destination) - @worker.post(message, destination) - end - def generate_id id = @current_id @current_id = id.succ @@ -124,6 +175,10 @@ module Droonga end private + def is_route?(route) + route.is_a?(String) || route.is_a?(Hash) + end + def log_tag "[#{Process.ppid}][#{Process.pid}] dispatcher" end Modified: lib/droonga/engine.rb (+29 -5) =================================================================== --- lib/droonga/engine.rb 2013-11-25 00:57:13 +0900 (ab296fa) +++ lib/droonga/engine.rb 2013-11-25 00:58:49 +0900 (1433c50) @@ -16,27 +16,51 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA require "droonga/logger" -require "droonga/executor" +require "droonga/dispatcher" module Droonga class Engine def initialize(options={}) @options = options + @dispatcher = Dispatcher.new(@options) end def start - @executor = Executor.new(@options) + @dispatcher.start end def shutdown $log.trace("engine: shutdown: start") - @executor.shutdown + @dispatcher.shutdown $log.trace("engine: shutdown: done") end - def emit(tag, time, record, synchronous=nil) + def emit(tag, time, record) $log.trace("[#{Process.pid}] tag: <#{tag}> caller: <#{caller.first}>") - @executor.dispatch(tag, time, record, synchronous) + @dispatcher.handle_envelope(parse_record(tag, record)) + end + + private + def parse_record(tag, record) + prefix, type, *arguments = tag.split(/\./) + if type.nil? || type.empty? || type == 'message' + envelope = record + else + envelope = { + "type" => type, + "arguments" => arguments, + "body" => record + } + end + envelope["via"] ||= [] + reply_to = envelope["replyTo"] + if reply_to.is_a? String + envelope["replyTo"] = { + "type" => envelope["type"] + ".result", + "to" => reply_to + } + end + envelope end end end Deleted: lib/droonga/executor.rb (+0 -140) 100644 =================================================================== --- lib/droonga/executor.rb 2013-11-25 00:57:13 +0900 (4df788f) +++ /dev/null @@ -1,140 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013 Droonga Project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "groonga" - -require "droonga/forwarder" -require "droonga/dispatcher" -require "droonga/distributor" - -module Droonga - class Executor - attr_reader :context, :envelope, :name - - def initialize(options={}) - @options = options - @name = options[:name] - prepare - end - - def shutdown - $log.trace("#{log_tag}: shutdown: start") - @distributor.shutdown - @forwarder.shutdown - $log.trace("#{log_tag}: shutdown: done") - end - - def add_route(route) - envelope["via"].push(route) - end - - def dispatch(tag, time, record, synchronous=nil) - $log.trace("#{log_tag}: dispatch: start") - message = [tag, time, record] - body, type, arguments = parse_message([tag, time, record]) - reply_to = envelope["replyTo"] - if reply_to.is_a? String - envelope["replyTo"] = { - "type" => type + ".result", - "to" => reply_to - } - end - post_or_push(message, body, - "type" => type, - "arguments" => arguments, - "synchronous" => synchronous) - $log.trace("#{log_tag}: dispatch: done") - end - - def post(body, destination=nil) - $log.trace("#{log_tag}: post: start") - post_or_push(nil, body, destination) - $log.trace("#{log_tag}: post: done") - end - - private - def post_or_push(message, body, destination) - route = nil - unless is_route?(destination) - route = envelope["via"].pop - destination = route - end - unless is_route?(destination) - destination = envelope["replyTo"] - end - command = nil - receiver = nil - arguments = nil - synchronous = nil - case destination - when String - command = destination - when Hash - command = destination["type"] - receiver = destination["to"] - arguments = destination["arguments"] - synchronous = destination["synchronous"] - end - if receiver - @forwarder.forward(envelope, body, - "type" => command, - "to" => receiver, - "arguments" => arguments) - else - if command == "dispatcher" - @dispatcher.handle(body, arguments) - elsif****@dispa*****?(command) - @dispatcher.process(command, body, *arguments) - else - @distributor.distribute(envelope.merge("type" => command, - "body" => body)) - end - end - add_route(route) if route - end - - def is_route?(route) - route.is_a?(String) || route.is_a?(Hash) - end - - def parse_message(message) - tag, time, record = message - prefix, type, *arguments = tag.split(/\./) - if type.nil? || type.empty? || type == 'message' - @envelope = record - else - @envelope = { - "type" => type, - "arguments" => arguments, - "body" => record - } - end - envelope["via"] ||= [] - [envelope["body"], envelope["type"], envelope["arguments"]] - end - - def prepare - @dispatcher = Dispatcher.new(self, name) - @distributor = Distributor.new(@dispatcher, @options) - @forwarder = Forwarder.new - end - - def log_tag - "[#{Process.ppid}][#{Process.pid}] executor" - end - end -end -------------- next part -------------- HTML����������������������������... Download