YUKI Hiroshi
null+****@clear*****
Thu Aug 28 18:35:39 JST 2014
YUKI Hiroshi 2014-08-28 18:35:39 +0900 (Thu, 28 Aug 2014) New Revision: ac06373f6254fbd7666f121048278bee92c00a5c https://github.com/droonga/droonga-engine/commit/ac06373f6254fbd7666f121048278bee92c00a5c Message: Split implementation for remote commands to separate classes Modified files: bin/droonga-engine-join lib/droonga/command/serf_event_handler.rb Modified: bin/droonga-engine-join (+0 -2) =================================================================== --- bin/droonga-engine-join 2014-08-28 17:16:24 +0900 (38cccb4) +++ bin/droonga-engine-join 2014-08-28 18:35:39 +0900 (6a8162e) @@ -70,9 +70,7 @@ run_remote_command(joining_node, "join", "node" => joining_node, "type" => "replica", "source" => source_node, - "port" => options[:port], "dataset" => options[:dataset], - "tag" => options[:tag], "copy" => !options["no-copy"]) sleep(5) #TODO: wait for restarting of the joining node. this should be done more safely. Modified: lib/droonga/command/serf_event_handler.rb (+282 -197) =================================================================== --- lib/droonga/command/serf_event_handler.rb 2014-08-28 17:16:24 +0900 (abe023b) +++ lib/droonga/command/serf_event_handler.rb 2014-08-28 18:35:39 +0900 (22b49cb) @@ -26,156 +26,162 @@ require "droonga/safe_file_writer" module Droonga module Command - class SerfEventHandler - class << self - def run - new.run + module Remote + class Base + attr_reader :response + + def initialize + @serf_name = ENV["SERF_SELF_NAME"] + @response = { + "log" => [] + } + @payload = JSON.parse($stdin.gets) end - end - def initialize - @serf = ENV["SERF"] || Serf.path - @serf_rpc_address = ENV["SERF_RPC_ADDRESS"] || "127.0.0.1:7373" - @serf_name = ENV["SERF_SELF_NAME"] - @response = { - "log" => [] - } - end + def process + # override me! + end - def run - parse_event - unless should_process? - log(" => ignoring event not for me") - output_response - return true + def should_process? + for_me? or****@paylo*****? or not****@paylo*****?("node") end - process_event - output_live_nodes - output_response - true - end + private + def node + @serf_name + end - private - def parse_event - @event_name = ENV["SERF_EVENT"] - @payload = nil - case @event_name - when "user" - @event_sub_name = ENV["SERF_USER_EVENT"] - @payload = JSON.parse($stdin.gets) - log("event sub name = #{@event_sub_name}") - when "query" - @event_sub_name = ENV["SERF_QUERY_NAME"] - @payload = JSON.parse($stdin.gets) - log("event sub name = #{@event_sub_name}") - when "member-join", "member-leave", "member-update", "member-reap" - output_live_nodes + def host + node.split(":").first end - end - def target_node - @payload && @payload["node"] - end + def target_node + @payload && @payload["node"] + end - def for_me? - target_node == @serf_name - end + def for_me? + target_node == @serf_name + end - def should_process? - for_me? or****@paylo*****? or not****@paylo*****?("node") + def log(message) + @response["log"] << message + end end - def process_event - case @event_sub_name - when "change_role" + class ChangeRole < Base + def process NodeStatus.set(:role, @payload["role"]) - when "report_status" - report_status - when "join" - join - when "set_replicas" - set_replicas - when "add_replicas" - add_replicas - when "remove_replicas" - remove_replicas - when "absorb_data" - absorb_data end end - def output_response - puts JSON.generate(@response) + class ReportStatus < Base + def process + @response["value"] = NodeStatus.get(@payload["key"]) + end end - def host - @serf_name.split(":").first - end + class Join < Base + def process + log("type = #{type}") + case type + when "replica" + join_as_replica + end + end - def given_hosts - hosts = @payload["hosts"] - return nil unless hosts - hosts = [hosts] if hosts.is_a?(String) - hosts - end + private + def type + @payload["type"] + end - def report_status - @response["value"] = NodeStatus.get(@payload["key"]) - end + def source_node + @payload["source"] + end - def join - type = @payload["type"] - log("type = #{type}") - case type - when "replica" - join_as_replica + def joining_node + @payload["node"] end - end - def join_as_replica - source_node = @payload["source"] - source_node_port = @payload["port"] - joining_node = @payload["node"] - tag = @payload["tag"] - dataset_name = @payload["dataset"] - required_params = [ - source_node, - source_node_port, - joining_node, - dataset_name, - ] - return unless required_params.all? - - log("source_node = #{source_node}") - - source_host = source_node.split(":").first - joining_host = joining_node.split(":").first - - fetcher = CatalogFetcher.new(:host => source_host, - :port => source_node_port, - :tag => tag, - :receiver_host => joining_host) - catalog = fetcher.fetch(:dataset => dataset_name) - - generator = CatalogGenerator.new - generator.load(catalog) - dataset = generator.dataset_for_host(source_host) || - generator.dataset_for_host(host) - return unless dataset - - # restart self with the fetched catalog. - SafeFileWriter.write(Path.catalog, JSON.pretty_generate(catalog)) - - tag = dataset.replicas.tag - port = dataset.replicas.port - other_hosts = dataset.replicas.hosts - - log("dataset = #{dataset_name}") - log("port = #{port}") - log("tag = #{tag}") - - if @payload["copy"] + def dataset_name + @payload["dataset"] + end + + def valid_params? + have_required_params? and + valid_node?(source_node) and + valid_node?(joining_node) + end + + def have_required_params? + required_params = [ + source_node, + joining_node, + dataset_name, + ] + required_params.all? do |param| + not param.nil? + end + end + + NODE_PATTERN = /\A([^:]+):(\d+)\/(.+)\z/ + + def valid_node?(node) + node =~ NODE_PATTERN + end + + def source_host + @source_host ||= (source_node =~ NODE_PATTERN && $1) + end + + def joining_host + @source_host ||= (joining_node =~ NODE_PATTERN && $1) + end + + def port + @port ||= (source_node =~ NODE_PATTERN && $2 && $2.to_i) + end + + def tag + @tag ||= (source_node =~ NODE_PATTERN && $3) + end + + def should_absorb_data? + @payload["copy"] + end + + def join_as_replica + return unless valid_params? + + log("source_node = #{source_node}") + + fetcher = CatalogFetcher.new(:host => source_host, + :port => port, + :tag => tag, + :receiver_host => joining_host) + catalog = fetcher.fetch(:dataset => dataset_name) + + generator = CatalogGenerator.new + generator.load(catalog) + dataset = generator.dataset_for_host(source_host) || + generator.dataset_for_host(host) + return unless dataset + + # restart self with the fetched catalog. + SafeFileWriter.write(Path.catalog, JSON.pretty_generate(catalog)) + + other_hosts = dataset.replicas.hosts + + absorb_data if should_absorb_data? + + log("joining to the cluster: update myself") + + CatalogModifier.modify do |modifier| + modifier.datasets[dataset_name].replicas.hosts += other_hosts + modifier.datasets[dataset_name].replicas.hosts.uniq! + end + end + + def absorb_data log("starting to copy data from #{source_host}") CatalogModifier.modify do |modifier| @@ -187,118 +193,197 @@ module Droonga status.set(:absorbing, true) DataAbsorber.absorb(:dataset => dataset_name, :source_host => source_host, - :destination_host => host, + :destination_host => joining_host, :port => port, :tag => tag) status.delete(:absorbing) sleep(1) end + end + + class AbsorbData < Base + attr_writer :dataset_name, :port, :tag - log("joining to the cluster: update myself") + def process + return unless source - CatalogModifier.modify do |modifier| - modifier.datasets[dataset_name].replicas.hosts += other_hosts - modifier.datasets[dataset_name].replicas.hosts.uniq! + log("start to absorb data from #{source}") + + if dataset_name.nil? or port.nil? or tag.nil? + current_catalog = JSON.parse(Path.catalog.read) + generator = CatalogGenerator.new + generator.load(current_catalog) + + dataset = generator.dataset_for_host(source) + return unless dataset + + self.dataset_name = dataset.name + self.port = dataset.replicas.port + self.tag = dataset.replicas.tag + end + + log("dataset = #{dataset_name}") + log("port = #{port}") + log("tag = #{tag}") + + status = NodeStatus.new + status.set(:absorbing, true) + DataAbsorber.absorb(:dataset => dataset_name, + :source_host => source, + :destination_host => host, + :port => port, + :tag => tag, + :client => "droonga-send") + status.delete(:absorbing) end - end - def set_replicas - dataset = @payload["dataset"] - return unless dataset + private + def source + @payload["source"] + end - hosts = given_hosts - return unless hosts + def dataset_name + @dataset_name ||= @payload["dataset"] + end - log("new replicas: #{hosts.join(",")}") + def port + @port ||= @payload["port"] + end - CatalogModifier.modify do |modifier| - modifier.datasets[dataset].replicas.hosts = hosts + def tag + @tag ||= @payload["tag"] end end - def add_replicas - dataset = @payload["dataset"] - return unless dataset + class ModifyReplicasBase < Base + private + def dataset + @payload["dataset"] + end - hosts = given_hosts - return unless hosts + def hosts + @hosts ||= prepare_hosts + end + + def prepare_hosts + hosts = @payload["hosts"] + return nil unless hosts + hosts = [hosts] if hosts.is_a?(String) + hosts + end + end - hosts -= [host] - return if hosts.empty? + class SetReplicas < ModifyReplicasBase + def process + return unless dataset + return unless hosts - log("adding replicas: #{hosts.join(",")}") + log("new replicas: #{hosts.join(",")}") - CatalogModifier.modify do |modifier| - modifier.datasets[dataset].replicas.hosts += hosts - modifier.datasets[dataset].replicas.hosts.uniq! + CatalogModifier.modify do |modifier| + modifier.datasets[dataset].replicas.hosts = hosts + end end end - def remove_replicas - dataset = @payload["dataset"] - return unless dataset + class AddReplicas < ModifyReplicasBase + def process + return unless dataset + return unless hosts - hosts = given_hosts - return unless hosts + hosts -= [host] + return if hosts.empty? - log("removing replicas: #{hosts.join(",")}") + log("adding replicas: #{hosts.join(",")}") - CatalogModifier.modify do |modifier| - modifier.datasets[dataset].replicas.hosts -= hosts + CatalogModifier.modify do |modifier| + modifier.datasets[dataset].replicas.hosts += hosts + modifier.datasets[dataset].replicas.hosts.uniq! + end end end - def absorb_data - source = @payload["source"] - return unless source + class RemoveReplicas < ModifyReplicasBase + def process + return unless dataset + return unless hosts - log("start to absorb data from #{source}") + log("removing replicas: #{hosts.join(",")}") - dataset_name = @payload["dataset"] - port = @payload["port"] - tag = @payload["tag"] + CatalogModifier.modify do |modifier| + modifier.datasets[dataset].replicas.hosts -= hosts + end + end + end - if dataset_name.nil? or port.nil? or tag.nil? - current_catalog = JSON.parse(Path.catalog.read) - generator = CatalogGenerator.new - generator.load(current_catalog) + class UpdateLiveNodes < Base + def process + def live_nodes + Serf.live_nodes(@serf_name) + end - dataset = generator.dataset_for_host(source) - return unless dataset + def output_live_nodes + path = Path.live_nodes + nodes = live_nodes + file_contents = JSON.pretty_generate(nodes) + SafeFileWriter.write(path, file_contents) + end + end + end + end - dataset_name = dataset.name - port = dataset.replicas.port - tag = dataset.replicas.tag + class SerfEventHandler + class << self + def run + new.run end + end - log("dataset = #{dataset_name}") - log("port = #{port}") - log("tag = #{tag}") + def run + command_class = detect_command_class + return true if command_class.nil? - status = NodeStatus.new - status.set(:absorbing, true) - DataAbsorber.absorb(:dataset => dataset_name, - :source_host => source, - :destination_host => host, - :port => port, - :tag => tag, - :client => "droonga-send") - status.delete(:absorbing) + command = command_class.new + command.process if command.should_process? + output_response(command.response) + true end - def live_nodes - Serf.live_nodes(@serf_name) + private + def detect_command_class + case ENV["SERF_EVENT"] + when "user" + detect_command_class_from_custom_event(ENV["SERF_USER_EVENT"]) + when "query" + detect_command_class_from_custom_event(ENV["SERF_QUERY_NAME"]) + when "member-join", "member-leave", "member-update", "member-reap" + Remote::UpdateLiveNodes + end end - def output_live_nodes - path = Path.live_nodes - nodes = live_nodes - file_contents = JSON.pretty_generate(nodes) - SafeFileWriter.write(path, file_contents) + def detect_command_class_from_custom_event(event_name) + case event_name + when "change_role" + Remote::ChangeRole + when "report_status" + Remote::ReportStatus + when "join" + Remote::Join + when "set_replicas" + Remote::SetReplicas + when "add_replicas" + Remote::AddReplicas + when "remove_replicas" + Remote::RemoveReplicas + when "absorb_data" + Remote::AbsorbData + else + nil + end end - def log(message) - @response["log"] << message + def output_response(response) + puts JSON.generate(response) end end end -------------- next part -------------- HTML����������������������������...Download