Kouhei Sutou
null+****@clear*****
Thu Dec 26 19:04:05 JST 2013
Kouhei Sutou 2013-12-26 19:04:05 +0900 (Thu, 26 Dec 2013) New Revision: f240daaa90e6dadc5b3a924a4fb116d548569ab8 https://github.com/droonga/fluent-plugin-droonga/commit/f240daaa90e6dadc5b3a924a4fb116d548569ab8 Message: Indent Modified files: lib/droonga/catalog/base.rb Modified: lib/droonga/catalog/base.rb (+95 -95) =================================================================== --- lib/droonga/catalog/base.rb 2013-12-26 19:03:43 +0900 (4f81cd4) +++ lib/droonga/catalog/base.rb 2013-12-26 19:04:05 +0900 (b98a691) @@ -20,128 +20,128 @@ require "droonga/message_processing_error" module Droonga module Catalog class Base - class UnknownDataset < NotFound - def initialize(dataset) - super("The dataset #{dataset.inspect} does not exist.") + class UnknownDataset < NotFound + def initialize(dataset) + super("The dataset #{dataset.inspect} does not exist.") + end end - end - attr_reader :path + attr_reader :path - def initialize(path=nil) - @path = path || default_path + def initialize(path=nil) + @path = path || default_path - open(@path) do |file| - @catalog = JSON.parse(file.read) - end - @catalog["datasets"].each do |name, dataset| - number_of_partitions = dataset["number_of_partitions"] - next if number_of_partitions < 2 - total_weight = dataset["ring"].reduce do |a, b| - a[1]["weight"] + b[1]["weight"] + open(@path) do |file| + @catalog = JSON.parse(file.read) end - continuum = [] - dataset["ring"].each do |key, value| - points = number_of_partitions * 160 * value["weight"] / total_weight - points.times do |point| - hash = Digest::SHA1.hexdigest("#{key}:#{point}") - continuum << [hash[0..7].to_i(16), key] + @catalog["datasets"].each do |name, dataset| + number_of_partitions = dataset["number_of_partitions"] + next if number_of_partitions < 2 + total_weight = dataset["ring"].reduce do |a, b| + a[1]["weight"] + b[1]["weight"] + end + continuum = [] + dataset["ring"].each do |key, value| + points = number_of_partitions * 160 * value["weight"] / total_weight + points.times do |point| + hash = Digest::SHA1.hexdigest("#{key}:#{point}") + continuum << [hash[0..7].to_i(16), key] + end end + dataset["continuum"] = continuum.sort do |a, b| a[0] - b[0]; end end - dataset["continuum"] = continuum.sort do |a, b| a[0] - b[0]; end + @options = @catalog["options"] || {} end - @options = @catalog["options"] || {} - end - def base_path - @base_path ||= File.dirname(@path) - end + def base_path + @base_path ||= File.dirname(@path) + end - def option(name) - @options[name] - end + def option(name) + @options[name] + end - def get_partitions(name) - device = @catalog["farms"][name]["device"] - pattern = Regexp.new("^#{name}\.") - results = {} - @catalog["datasets"].each do |key, dataset| - workers = dataset["workers"] - plugins = dataset["plugins"] - dataset["ring"].each do |key, part| - part["partitions"].each do |range, partitions| - partitions.each do |partition| - if partition =~ pattern - path = File.join([device, $POSTMATCH, "db"]) - path = File.expand_path(path, base_path) - options = { - :database => path, - :n_workers => workers, - :handlers => plugins - } - results[partition] = options + def get_partitions(name) + device = @catalog["farms"][name]["device"] + pattern = Regexp.new("^#{name}\.") + results = {} + @catalog["datasets"].each do |key, dataset| + workers = dataset["workers"] + plugins = dataset["plugins"] + dataset["ring"].each do |key, part| + part["partitions"].each do |range, partitions| + partitions.each do |partition| + if partition =~ pattern + path = File.join([device, $POSTMATCH, "db"]) + path = File.expand_path(path, base_path) + options = { + :database => path, + :n_workers => workers, + :handlers => plugins + } + results[partition] = options + end end end end end + return results end - return results - end - def get_routes(name, args) - routes = [] - dataset = dataset(name) - case args["type"] - when "broadcast" - dataset["ring"].each do |key, partition| + def get_routes(name, args) + routes = [] + dataset = dataset(name) + case args["type"] + when "broadcast" + dataset["ring"].each do |key, partition| + select_range_and_replicas(partition, args, routes) + end + when "scatter" + name = get_partition(dataset, args["key"]) + partition = dataset["ring"][name] select_range_and_replicas(partition, args, routes) end - when "scatter" - name = get_partition(dataset, args["key"]) - partition = dataset["ring"][name] - select_range_and_replicas(partition, args, routes) + return routes end - return routes - end - def get_partition(dataset, key) - continuum = dataset["continuum"] - return dataset["ring"].keys[0] unless continuum - hash = Zlib.crc32(key) - min = 0 - max = continuum.size - 1 - while (min < max) do - index = (min + max) / 2 - value, key = continuum[index] - return key if value == hash - if value > hash - max = index - else - min = index + 1 + def get_partition(dataset, key) + continuum = dataset["continuum"] + return dataset["ring"].keys[0] unless continuum + hash = Zlib.crc32(key) + min = 0 + max = continuum.size - 1 + while (min < max) do + index = (min + max) / 2 + value, key = continuum[index] + return key if value == hash + if value > hash + max = index + else + min = index + 1 + end end + return continuum[max][1] end - return continuum[max][1] - end - def dataset(name) - dataset = @catalog["datasets"][name] - raise UnknownDataset.new(name) unless dataset - dataset - end + def dataset(name) + dataset = @catalog["datasets"][name] + raise UnknownDataset.new(name) unless dataset + dataset + end - def select_range_and_replicas(partition, args, routes) - date_range = args["date_range"] || 0..-1 - partition["partitions"].sort[date_range].each do |time, replicas| - case args["replica"] - when "top" - routes << replicas[0] - when "random" - routes << replicas[rand(replicas.size)] - when "all" - routes.concat(replicas) + def select_range_and_replicas(partition, args, routes) + date_range = args["date_range"] || 0..-1 + partition["partitions"].sort[date_range].each do |time, replicas| + case args["replica"] + when "top" + routes << replicas[0] + when "random" + routes << replicas[rand(replicas.size)] + when "all" + routes.concat(replicas) + end end end end - end end end -------------- next part -------------- HTML����������������������������...Download