[Groonga-commit] groonga/fluent-plugin-droonga at 8ba8513 [master] Add proxy.rb

Back to archive index

Daijiro MORI null+****@clear*****
Wed Aug 14 10:26:04 JST 2013


Daijiro MORI	2013-08-14 10:26:04 +0900 (Wed, 14 Aug 2013)

  New Revision: 8ba8513404291bb2f1d3bf767d4fb31412fe9176
  https://github.com/groonga/fluent-plugin-droonga/commit/8ba8513404291bb2f1d3bf767d4fb31412fe9176

  Message:
    Add proxy.rb

  Added files:
    lib/droonga/proxy.rb

  Added: lib/droonga/proxy.rb (+223 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/proxy.rb    2013-08-14 10:26:04 +0900 (0688388)
@@ -0,0 +1,223 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2013 droonga project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require 'tsort'
+
+module Droonga
+  class Proxy
+    def initialize(worker, name)
+      @worker = worker
+      @name = name
+      @collectors = {}
+      @current_id = 0
+      @local = Regexp.new("^#{@name}")
+    end
+
+    def handle(message)
+      case message
+      when Array
+        handle_incoming_message(message)
+      when Hash
+        handle_internal_message(message)
+      end
+    end
+
+    def handle_incoming_message(message)
+      id = generate_id
+      @planner = Planner.new(message)
+      destinations =****@plann*****(id)
+      message = {
+        "id" => id,
+        "components" => @planner.components
+      }
+      destinations.each do |destination, frequency|
+        dispatch(destination, message)
+      end
+    end
+
+    def dispatch(destination, message)
+      if destination =~ @local
+        @collectors[message["id"]] =****@plann*****_collector(@local)
+      else
+        post(destination, message)
+      end
+    end
+
+    def handle_internal_message(message)
+      components = message["components"]
+      @planner = Planner.new(components) unless @planner
+      if message["input"]
+        # received a piece of result
+      else
+        # received a query
+        # what if @collectors[message["id"]] ?
+        @collectors[message["id"]] =****@plann*****_collector(@local)
+      end
+    end
+
+    def post(route, message)
+      @worker.post(message, "to"=> route, "type"=>"proxy")
+    end
+
+    def generate_id
+      id = @current_id
+      @current_id = id.succ
+      return [@name, id].join('.#')
+    end
+
+    class Planner
+      attr_reader :components
+      class UndefinedInputError < StandardError
+        attr_reader :input
+        def initialize(input)
+          @input = input
+          super("undefined input assigned: <#{input}>")
+        end
+      end
+
+      class CyclicComponentsError < StandardError
+        attr_reader :components
+        def initialize(components)
+          @components = components
+          super("cyclic components found: <#{components}>")
+        end
+      end
+
+      include TSort
+      def initialize(components)
+        @components = components
+      end
+
+      def resolve(id)
+        @dependency = {}
+        @components.each do |component|
+          @dependency[component] = component["inputs"]
+          next unless component["outputs"]
+          component["outputs"].each do |output|
+            @dependency[output] = [component]
+          end
+        end
+        @components = []
+        each_strongly_connected_component do |cs|
+          raise CyclicComponentsError.new(cs) if cs.size > 1
+          @components.concat(cs) unless cs.first.is_a? String
+        end
+        resolve_routes(id)
+      end
+
+      def resolve_routes(id)
+        local = [id]
+        destinations = Hash.new(0)
+        @components.each do |component|
+          dataset = component["dataset"]
+          routes =
+            if dataset
+              Droonga::catalog.get_routes(dataset, component)
+            else
+              local
+            end
+          routes.each do |route|
+            route =~ /\A.*:\d+\/[^\.]+/
+            destinations[$&] += 1
+          end
+          component["routes"] = routes
+        end
+        return destinations
+      end
+
+      def get_collector(local)
+        resolve_descendants
+        tasks = []
+        inputs = {}
+        @components.each do |component|
+          component["routes"].each do |route|
+            next unless route =~ local
+            task = {
+              "route" => route,
+              "component" => component,
+              "n_of_inputs" => 0
+            }
+            (component["inputs"] || [nil]).each do |input|
+              inputs[input] ||= []
+              inputs[input] << task
+            end
+          end
+        end
+        Collector.new(@components, tasks, inputs)
+      end
+
+      def resolve_descendants
+        return if @descendants
+        @descendants = {}
+        @components.size.times do |index|
+          component = @components[index]
+          (component["inputs"] || []).each do |input|
+            @descendants[input] ||= []
+            @descendants[input] << index
+          end
+          component["n_of_expects"] = 0
+        end
+        @components.each do |component|
+          descendants = get_descendants(component)
+          component["descendants"] = descendants
+          descendants.each do |key, indices|
+            indices.each do |index|
+              @components[index]["n_of_expects"] += component["routes"].size
+            end
+          end
+        end
+      end
+
+      def get_descendants(component)
+        return {} unless component["outputs"]
+        descendants = {}
+        component["outputs"].each do |output|
+          descendants[output] = @descendants[output]
+        end
+        descendants
+      end
+
+      def tsort_each_node(&block)
+        @dependency.each_key(&block)
+      end
+
+      def tsort_each_child(node, &block)
+        if node.is_a? String and @dependency[node].nil?
+          raise UndefinedInputError.new(node)
+        end
+        if @dependency[node]
+          @dependency[node].each(&block)
+        end
+      end
+    end
+
+    class Collector
+      def initialize(components, tasks, inputs)
+        @components = components
+        @tasks = tasks
+        @inputs = inputs
+      end
+
+      def handle(name, value)
+        tasks = @inputs[name]
+        tasks.each do |task|
+          # todo
+        end
+      end
+    end
+  end
+end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index