Daijiro MORI
null+****@clear*****
Wed Aug 14 10:26:04 JST 2013
Daijiro MORI 2013-08-14 10:26:04 +0900 (Wed, 14 Aug 2013) New Revision: 8ba8513404291bb2f1d3bf767d4fb31412fe9176 https://github.com/groonga/fluent-plugin-droonga/commit/8ba8513404291bb2f1d3bf767d4fb31412fe9176 Message: Add proxy.rb Added files: lib/droonga/proxy.rb Added: lib/droonga/proxy.rb (+223 -0) 100644 =================================================================== --- /dev/null +++ lib/droonga/proxy.rb 2013-08-14 10:26:04 +0900 (0688388) @@ -0,0 +1,223 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2013 droonga project +# +# This library is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License version 2.1 as published by the Free Software Foundation. +# +# This library is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this library; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +require 'tsort' + +module Droonga + class Proxy + def initialize(worker, name) + @worker = worker + @name = name + @collectors = {} + @current_id = 0 + @local = Regexp.new("^#{@name}") + end + + def handle(message) + case message + when Array + handle_incoming_message(message) + when Hash + handle_internal_message(message) + end + end + + def handle_incoming_message(message) + id = generate_id + @planner = Planner.new(message) + destinations =****@plann*****(id) + message = { + "id" => id, + "components" => @planner.components + } + destinations.each do |destination, frequency| + dispatch(destination, message) + end + end + + def dispatch(destination, message) + if destination =~ @local + @collectors[message["id"]] =****@plann*****_collector(@local) + else + post(destination, message) + end + end + + def handle_internal_message(message) + components = message["components"] + @planner = Planner.new(components) unless @planner + if message["input"] + # received a piece of result + else + # received a query + # what if @collectors[message["id"]] ? + @collectors[message["id"]] =****@plann*****_collector(@local) + end + end + + def post(route, message) + @worker.post(message, "to"=> route, "type"=>"proxy") + end + + def generate_id + id = @current_id + @current_id = id.succ + return [@name, id].join('.#') + end + + class Planner + attr_reader :components + class UndefinedInputError < StandardError + attr_reader :input + def initialize(input) + @input = input + super("undefined input assigned: <#{input}>") + end + end + + class CyclicComponentsError < StandardError + attr_reader :components + def initialize(components) + @components = components + super("cyclic components found: <#{components}>") + end + end + + include TSort + def initialize(components) + @components = components + end + + def resolve(id) + @dependency = {} + @components.each do |component| + @dependency[component] = component["inputs"] + next unless component["outputs"] + component["outputs"].each do |output| + @dependency[output] = [component] + end + end + @components = [] + each_strongly_connected_component do |cs| + raise CyclicComponentsError.new(cs) if cs.size > 1 + @components.concat(cs) unless cs.first.is_a? String + end + resolve_routes(id) + end + + def resolve_routes(id) + local = [id] + destinations = Hash.new(0) + @components.each do |component| + dataset = component["dataset"] + routes = + if dataset + Droonga::catalog.get_routes(dataset, component) + else + local + end + routes.each do |route| + route =~ /\A.*:\d+\/[^\.]+/ + destinations[$&] += 1 + end + component["routes"] = routes + end + return destinations + end + + def get_collector(local) + resolve_descendants + tasks = [] + inputs = {} + @components.each do |component| + component["routes"].each do |route| + next unless route =~ local + task = { + "route" => route, + "component" => component, + "n_of_inputs" => 0 + } + (component["inputs"] || [nil]).each do |input| + inputs[input] ||= [] + inputs[input] << task + end + end + end + Collector.new(@components, tasks, inputs) + end + + def resolve_descendants + return if @descendants + @descendants = {} + @components.size.times do |index| + component = @components[index] + (component["inputs"] || []).each do |input| + @descendants[input] ||= [] + @descendants[input] << index + end + component["n_of_expects"] = 0 + end + @components.each do |component| + descendants = get_descendants(component) + component["descendants"] = descendants + descendants.each do |key, indices| + indices.each do |index| + @components[index]["n_of_expects"] += component["routes"].size + end + end + end + end + + def get_descendants(component) + return {} unless component["outputs"] + descendants = {} + component["outputs"].each do |output| + descendants[output] = @descendants[output] + end + descendants + end + + def tsort_each_node(&block) + @dependency.each_key(&block) + end + + def tsort_each_child(node, &block) + if node.is_a? String and @dependency[node].nil? + raise UndefinedInputError.new(node) + end + if @dependency[node] + @dependency[node].each(&block) + end + end + end + + class Collector + def initialize(components, tasks, inputs) + @components = components + @tasks = tasks + @inputs = inputs + end + + def handle(name, value) + tasks = @inputs[name] + tasks.each do |task| + # todo + end + end + end + end +end -------------- next part -------------- HTML����������������������������... Download