Daijiro MORI
null+****@clear*****
Fri Sep 6 16:26:43 JST 2013
Daijiro MORI 2013-09-06 16:26:43 +0900 (Fri, 06 Sep 2013) New Revision: 56d8b135d25fc114356e3ff38dcf1048aabe0500 https://github.com/droonga/fluent-plugin-droonga/commit/56d8b135d25fc114356e3ff38dcf1048aabe0500 Message: Enable adapters. Removed files: lib/droonga/plugin/handler_merge.rb Modified files: lib/droonga/adapter.rb lib/droonga/engine.rb lib/droonga/executor.rb lib/droonga/server.rb Modified: lib/droonga/adapter.rb (+19 -22) =================================================================== --- lib/droonga/adapter.rb 2013-09-06 09:58:56 +0900 (840e003) +++ lib/droonga/adapter.rb 2013-09-06 16:26:43 +0900 (ed76b37) @@ -15,35 +15,32 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -require "droonga/command_mapper" +require "droonga/handler" module Droonga - class Adapter - class << self - def inherited(sub_class) - super - sub_class.instance_variable_set(:@command_mapper, CommandMapper.new) - end + class Adapter < Droonga::Handler + Droonga::HandlerPlugin.register("adapter", self) - def command(name_or_map) - @command_mapper.register(name_or_map) - end - - def method_name(command) - @command_mapper[command] - end - end - - def initialize(proxy) - @proxy = proxy + command :table_create + def table_create(request) + broadcast_all(request) end - def adapt(command, request) - __send__(self.class.method_name(command), request) + command :column_create + def column_create(request) + broadcast_all(request) end - def post(request, &block) - @proxy.post(request, &block) + def broadcast_all(request) + message = [{ + "command"=> envelope["type"], + "dataset"=> envelope["dataset"], + "body"=> request, + "type"=> "broadcast", + "replica"=> "all", + "post"=> true + }] + post(message, "proxy") end end end Modified: lib/droonga/engine.rb (+5 -1) =================================================================== --- lib/droonga/engine.rb 2013-09-06 09:58:56 +0900 (129dd80) +++ lib/droonga/engine.rb 2013-09-06 16:26:43 +0900 (8b3570f) @@ -22,6 +22,7 @@ require "cool.io" require "droonga/server" require "droonga/worker" require "droonga/executor" +require "droonga/adapter" module Droonga class Engine @@ -52,7 +53,10 @@ module Droonga start_emitter else @executor = Executor.new(@options) - @executor.add_handler("proxy_message") if @options[:proxy] + if @options[:proxy] + @executor.add_handler("proxy_message") + @executor.add_handler("adapter") + end end end Modified: lib/droonga/executor.rb (+11 -10) =================================================================== --- lib/droonga/executor.rb 2013-09-06 09:58:56 +0900 (66935a3) +++ lib/droonga/executor.rb 2013-09-06 16:26:43 +0900 (1f7f298) @@ -30,16 +30,17 @@ module Droonga attr_reader :context, :envelope, :name def initialize(options={}) - @handlers = [] - @outputs = {} - @name = options[:name] - @database_name = options[:database] || "droonga/db" - @queue_name = options[:queue_name] || "DroongaQueue" - Droonga::JobQueue.ensure_schema(@database_name, @queue_name) - @handler_names = options[:handlers] || ["proxy"] - @pool_size = options[:n_workers] - load_handlers - prepare + @handlers = [] + @outputs = {} + @name = options[:name] + @database_name = options[:database] || "droonga/db" + @queue_name = options[:queue_name] || "DroongaQueue" + Droonga::JobQueue.ensure_schema(@database_name, @queue_name) + @handler_names = options[:handlers] || ["proxy"] + @pool_size = options[:n_workers] +# load_handlers + Droonga::Plugin.load_all + prepare end def shutdown Deleted: lib/droonga/plugin/handler_merge.rb (+0 -140) 100644 =================================================================== --- lib/droonga/plugin/handler_merge.rb 2013-09-06 09:58:56 +0900 (d4f11d0) +++ /dev/null @@ -1,140 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) 2013 droonga project -# -# This library is free software; you can redistribute it and/or -# modify it under the terms of the GNU Lesser General Public -# License version 2.1 as published by the Free Software Foundation. -# -# This library is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU -# Lesser General Public License for more details. -# -# You should have received a copy of the GNU Lesser General Public -# License along with this library; if not, write to the Free Software -# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -require "droonga/handler" - -module Droonga - class MergeHandler < Droonga::Handler - Droonga::HandlerPlugin.register("merge", self) - - CONFIG_FILE_PATH = 'config.json' - - def initialize(*arguments) - super - open(CONFIG_FILE_PATH) do |file| - @config = JSON.parse(file.read) - end - @mergers = {} - end - - command "merge" => :adapt_request - command "merge.result" => :adapt_reply - - def adapt_request(request, *arguments) - dataset = @config["datasets"][request["dataset"]] - return unless dataset - @mergers[envelope["id"]] = merger = Merger.new(dataset) - add_route(merger.merger_path) - merger.routes.each do |route| - post(request, route) - end - end - - def adapt_reply(reply) - id = envelope["id"] - merger = @mergers[id] - return unless merger - merger.add(reply) - return unless merger.fulfilled? - post(merger.result) - @mergers.delete(id) - end - - class Merger - attr_reader :routes - attr_reader :result - attr_reader :merger_path - def initialize(dataset) - @dataset = dataset - @merge_policy = dataset["merge_policy"] - @merger_path = dataset["merger_path"] || "merge.result" - @routes = [] - dataset["shards"].collect do |key, shard| - n_replications = shard["instances"].size - next if n_replications.zero? - index = rand(n_replications) - @routes << shard["instances"][index]["route"] - end - @n_shards =****@route***** - @n_replies = 0 - @result = nil - end - - def add(reply) - if @result - merge!(@result, reply) - else - @result = reply - end - @n_replies += 1 - end - - def fulfilled?() - @n_replies == @n_shards - end - - private - def merge!(a, b) - @merge_policy.each do |policy| - path = policy["path"] - case policy["procedure"] - when "sum" - last = path[-1] - _a, _b = fetch_element(path[0..-2], a, b) - _a[last] += _b[last] - when "sort" - _a, _b = fetch_element(path, a, b) - merge_sort!(_a, _b, policy["order"]) - end - end - end - - def fetch_element(path, a, b) - path.each do |index| - a = a[index]||a[index] - b = b[index]||b[index] - end - [a, b] - end - - def compare(a, b, operators) - for index in 0..a.size-1 do - _a = a[index] - _b = b[index] - operator = operators[index] - break unless operator - return true if _a.__send__(operator, _b) - end - return false - end - - def merge_sort!(a, b, order) - index = 0 - b.each do |_b| - loop do - _a = a[index] - break unless _a - break if compare(_b, _a, order) - index += 1 - end - a.insert(index, _b) - index += 1 - end - end - end - end -end Modified: lib/droonga/server.rb (+4 -1) =================================================================== --- lib/droonga/server.rb 2013-09-06 09:58:56 +0900 (849cd3a) +++ lib/droonga/server.rb 2013-09-06 16:26:43 +0900 (888894c) @@ -66,7 +66,10 @@ module Droonga super @message_input = config[:message_input] @executor = Executor.new(config) - @executor.add_handler("proxy_message") if config[:name] + if config[:proxy] + @executor.add_handler("proxy_message") + @executor.add_handler("adapter") + end end def before_run -------------- next part -------------- HTML����������������������������... Download