[Groonga-commit] droonga/fluent-plugin-droonga at 56d8b13 [master] Enable adapters.

Back to archive index

Daijiro MORI null+****@clear*****
Fri Sep 6 16:26:43 JST 2013


Daijiro MORI	2013-09-06 16:26:43 +0900 (Fri, 06 Sep 2013)

  New Revision: 56d8b135d25fc114356e3ff38dcf1048aabe0500
  https://github.com/droonga/fluent-plugin-droonga/commit/56d8b135d25fc114356e3ff38dcf1048aabe0500

  Message:
    Enable adapters.

  Removed files:
    lib/droonga/plugin/handler_merge.rb
  Modified files:
    lib/droonga/adapter.rb
    lib/droonga/engine.rb
    lib/droonga/executor.rb
    lib/droonga/server.rb

  Modified: lib/droonga/adapter.rb (+19 -22)
===================================================================
--- lib/droonga/adapter.rb    2013-09-06 09:58:56 +0900 (840e003)
+++ lib/droonga/adapter.rb    2013-09-06 16:26:43 +0900 (ed76b37)
@@ -15,35 +15,32 @@
 # License along with this library; if not, write to the Free Software
 # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
-require "droonga/command_mapper"
+require "droonga/handler"
 
 module Droonga
-  class Adapter
-    class << self
-      def inherited(sub_class)
-        super
-        sub_class.instance_variable_set(:@command_mapper, CommandMapper.new)
-      end
+  class Adapter < Droonga::Handler
+    Droonga::HandlerPlugin.register("adapter", self)
 
-      def command(name_or_map)
-        @command_mapper.register(name_or_map)
-      end
-
-      def method_name(command)
-        @command_mapper[command]
-      end
-    end
-
-    def initialize(proxy)
-      @proxy = proxy
+    command :table_create
+    def table_create(request)
+      broadcast_all(request)
     end
 
-    def adapt(command, request)
-      __send__(self.class.method_name(command), request)
+    command :column_create
+    def column_create(request)
+      broadcast_all(request)
     end
 
-    def post(request, &block)
-      @proxy.post(request, &block)
+    def broadcast_all(request)
+      message = [{
+        "command"=> envelope["type"],
+        "dataset"=> envelope["dataset"],
+        "body"=> request,
+        "type"=> "broadcast",
+        "replica"=> "all",
+        "post"=> true
+      }]
+      post(message, "proxy")
     end
   end
 end

  Modified: lib/droonga/engine.rb (+5 -1)
===================================================================
--- lib/droonga/engine.rb    2013-09-06 09:58:56 +0900 (129dd80)
+++ lib/droonga/engine.rb    2013-09-06 16:26:43 +0900 (8b3570f)
@@ -22,6 +22,7 @@ require "cool.io"
 require "droonga/server"
 require "droonga/worker"
 require "droonga/executor"
+require "droonga/adapter"
 
 module Droonga
   class Engine
@@ -52,7 +53,10 @@ module Droonga
         start_emitter
       else
         @executor = Executor.new(@options)
-        @executor.add_handler("proxy_message") if @options[:proxy]
+        if @options[:proxy]
+          @executor.add_handler("proxy_message")
+          @executor.add_handler("adapter")
+        end
       end
     end
 

  Modified: lib/droonga/executor.rb (+11 -10)
===================================================================
--- lib/droonga/executor.rb    2013-09-06 09:58:56 +0900 (66935a3)
+++ lib/droonga/executor.rb    2013-09-06 16:26:43 +0900 (1f7f298)
@@ -30,16 +30,17 @@ module Droonga
     attr_reader :context, :envelope, :name
 
     def initialize(options={})
-       @handlers = []
-       @outputs = {}
-       @name = options[:name]
-       @database_name = options[:database] || "droonga/db"
-       @queue_name = options[:queue_name] || "DroongaQueue"
-       Droonga::JobQueue.ensure_schema(@database_name, @queue_name)
-       @handler_names = options[:handlers] || ["proxy"]
-       @pool_size = options[:n_workers]
-       load_handlers
-       prepare
+      @handlers = []
+      @outputs = {}
+      @name = options[:name]
+      @database_name = options[:database] || "droonga/db"
+      @queue_name = options[:queue_name] || "DroongaQueue"
+      Droonga::JobQueue.ensure_schema(@database_name, @queue_name)
+      @handler_names = options[:handlers] || ["proxy"]
+      @pool_size = options[:n_workers]
+#     load_handlers
+      Droonga::Plugin.load_all
+      prepare
     end
 
     def shutdown

  Deleted: lib/droonga/plugin/handler_merge.rb (+0 -140) 100644
===================================================================
--- lib/droonga/plugin/handler_merge.rb    2013-09-06 09:58:56 +0900 (d4f11d0)
+++ /dev/null
@@ -1,140 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2013 droonga project
-#
-# This library is free software; you can redistribute it and/or
-# modify it under the terms of the GNU Lesser General Public
-# License version 2.1 as published by the Free Software Foundation.
-#
-# This library is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
-# Lesser General Public License for more details.
-#
-# You should have received a copy of the GNU Lesser General Public
-# License along with this library; if not, write to the Free Software
-# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
-
-require "droonga/handler"
-
-module Droonga
-  class MergeHandler < Droonga::Handler
-    Droonga::HandlerPlugin.register("merge", self)
-
-    CONFIG_FILE_PATH = 'config.json'
-
-    def initialize(*arguments)
-      super
-      open(CONFIG_FILE_PATH) do |file|
-        @config = JSON.parse(file.read)
-      end
-      @mergers = {}
-    end
-
-    command "merge" => :adapt_request
-    command "merge.result" => :adapt_reply
-
-    def adapt_request(request, *arguments)
-      dataset = @config["datasets"][request["dataset"]]
-      return unless dataset
-      @mergers[envelope["id"]] = merger = Merger.new(dataset)
-      add_route(merger.merger_path)
-      merger.routes.each do |route|
-        post(request, route)
-      end
-    end
-
-    def adapt_reply(reply)
-      id = envelope["id"]
-      merger = @mergers[id]
-      return unless merger
-      merger.add(reply)
-      return unless merger.fulfilled?
-      post(merger.result)
-      @mergers.delete(id)
-    end
-
-    class Merger
-      attr_reader :routes
-      attr_reader :result
-      attr_reader :merger_path
-      def initialize(dataset)
-        @dataset = dataset
-        @merge_policy = dataset["merge_policy"]
-        @merger_path = dataset["merger_path"] || "merge.result"
-        @routes = []
-        dataset["shards"].collect do |key, shard|
-          n_replications = shard["instances"].size
-          next if n_replications.zero?
-          index = rand(n_replications)
-          @routes << shard["instances"][index]["route"]
-        end
-        @n_shards =****@route*****
-        @n_replies = 0
-        @result = nil
-      end
-
-      def add(reply)
-        if @result
-          merge!(@result, reply)
-        else
-          @result = reply
-        end
-        @n_replies += 1
-      end
-
-      def fulfilled?()
-        @n_replies == @n_shards
-      end
-
-      private
-      def merge!(a, b)
-        @merge_policy.each do |policy|
-          path = policy["path"]
-          case policy["procedure"]
-          when "sum"
-            last = path[-1]
-            _a, _b = fetch_element(path[0..-2], a, b)
-            _a[last] += _b[last]
-          when "sort"
-            _a, _b = fetch_element(path, a, b)
-            merge_sort!(_a, _b, policy["order"])
-          end
-        end
-      end
-
-      def fetch_element(path, a, b)
-        path.each do |index|
-          a = a[index]||a[index]
-          b = b[index]||b[index]
-        end
-        [a, b]
-      end
-
-      def compare(a, b, operators)
-        for index in 0..a.size-1 do
-          _a = a[index]
-          _b = b[index]
-          operator = operators[index]
-          break unless operator
-          return true if _a.__send__(operator, _b)
-        end
-        return false
-      end
-
-      def merge_sort!(a, b, order)
-        index = 0
-        b.each do |_b|
-          loop do
-            _a = a[index]
-            break unless _a
-            break if compare(_b, _a, order)
-            index += 1
-          end
-          a.insert(index, _b)
-          index += 1
-        end
-      end
-    end
-  end
-end

  Modified: lib/droonga/server.rb (+4 -1)
===================================================================
--- lib/droonga/server.rb    2013-09-06 09:58:56 +0900 (849cd3a)
+++ lib/droonga/server.rb    2013-09-06 16:26:43 +0900 (888894c)
@@ -66,7 +66,10 @@ module Droonga
       super
       @message_input = config[:message_input]
       @executor = Executor.new(config)
-      @executor.add_handler("proxy_message") if config[:name]
+      if config[:proxy]
+        @executor.add_handler("proxy_message")
+        @executor.add_handler("adapter")
+      end
     end
 
     def before_run
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index