YUKI Hiroshi
null+****@clear*****
Wed Apr 15 14:59:08 JST 2015
YUKI Hiroshi 2015-04-15 14:59:08 +0900 (Wed, 15 Apr 2015) New Revision: 3e07e747eb43753483d45dad03e56554f01bfdf2 https://github.com/droonga/droonga-engine/commit/3e07e747eb43753483d45dad03e56554f01bfdf2 Message: Fix indent Modified files: bin/droonga-engine-absorb-data bin/droonga-engine-join bin/droonga-engine-set-role bin/droonga-engine-unjoin Modified: bin/droonga-engine-absorb-data (+200 -200) =================================================================== --- bin/droonga-engine-absorb-data 2015-04-15 14:56:31 +0900 (fc3f190) +++ bin/droonga-engine-absorb-data 2015-04-15 14:59:08 +0900 (18e4678) @@ -30,242 +30,242 @@ require "droonga/node_metadata" require "droonga/restarter" module Droonga -class AbsorbDataCommand - def run - @loop = Coolio::Loop.default + class AbsorbDataCommand + def run + @loop = Coolio::Loop.default - parse_options - assert_valid_options - trap_signals + parse_options + assert_valid_options + trap_signals - puts "Start to absorb data from #{@options.soruce_dataset} at #{source_node.to_s}" - puts " to #{@options.dataset} at #{destination_host.to_s}" - puts " via #{@options.receiver_host} (this host)" - puts "" - puts "Absorbing..." + puts "Start to absorb data from #{@options.soruce_dataset} at #{source_node.to_s}" + puts " to #{@options.dataset} at #{destination_host.to_s}" + puts " via #{@options.receiver_host} (this host)" + puts "" + puts "Absorbing..." - succeeded = absorb + succeeded = absorb - puts "Done." if succeeded - exit(succeeded) - end + puts "Done." if succeeded + exit(succeeded) + end - private - def parse_options - options = OpenStruct.new + private + def parse_options + options = OpenStruct.new - options.host = Socket.gethostname - options.port = DataAbsorberClient::DEFAULT_PORT - options.tag = DataAbsorberClient::DEFAULT_TAG - options.dataset = DataAbsorberClient::DEFAULT_DATASET + options.host = Socket.gethostname + options.port = DataAbsorberClient::DEFAULT_PORT + options.tag = DataAbsorberClient::DEFAULT_TAG + options.dataset = DataAbsorberClient::DEFAULT_DATASET - options.source_host = DataAbsorberClient::DEFAULT_HOST - options.source_port = DataAbsorberClient::DEFAULT_PORT - options.source_tag = DataAbsorberClient::DEFAULT_TAG - options.source_dataset = DataAbsorberClient::DEFAULT_DATASET + options.source_host = DataAbsorberClient::DEFAULT_HOST + options.source_port = DataAbsorberClient::DEFAULT_PORT + options.source_tag = DataAbsorberClient::DEFAULT_TAG + options.source_dataset = DataAbsorberClient::DEFAULT_DATASET - options.receiver_host = Socket.gethostname + options.receiver_host = Socket.gethostname - options.messages_per_second = DataAbsorberClient::DEFAULT_MESSAGES_PER_SECOND - options.progress_interval_seconds = DataAbsorberClient::DEFAULT_PROGRESS_INTERVAL_SECONDS + options.messages_per_second = DataAbsorberClient::DEFAULT_MESSAGES_PER_SECOND + options.progress_interval_seconds = DataAbsorberClient::DEFAULT_PROGRESS_INTERVAL_SECONDS - options.verbose = false + options.verbose = false - parser = OptionParser.new - parser.version = Engine::VERSION + parser = OptionParser.new + parser.version = Engine::VERSION - parser.separator("") - parser.separator("Destination node:") - parser.on("--host=HOST", - "Host name of the destination node.") do |host| - options.host = host - end - parser.on("--port=PORT", Integer, - "Port number of the destination node.", - "(#{options.port})") do |port| - options.port = port - end - parser.on("--tag=TAG", Integer, - "Tag name of the destination node.", - "(#{options.tag})") do |tag| - options.tag = tag - end - parser.on("--dataset=DATASET", - "Name of the destination dataset.", - "(#{options.dataset})") do |dataset| - options.dataset = dataset - end + parser.separator("") + parser.separator("Destination node:") + parser.on("--host=HOST", + "Host name of the destination node.") do |host| + options.host = host + end + parser.on("--port=PORT", Integer, + "Port number of the destination node.", + "(#{options.port})") do |port| + options.port = port + end + parser.on("--tag=TAG", Integer, + "Tag name of the destination node.", + "(#{options.tag})") do |tag| + options.tag = tag + end + parser.on("--dataset=DATASET", + "Name of the destination dataset.", + "(#{options.dataset})") do |dataset| + options.dataset = dataset + end - parser.separator("") - parser.separator("Source node:") - parser.on("--source-host=HOST", - "Host name of the source node.", - "(#{options.source_host})") do |host| - options.source_host = host - end - parser.on("--source-port=PORT", Integer, - "Port number of the source node.", - "(#{options.source_port})") do |host| - options.source_host = host - end - parser.on("--source-tag=TAG", - "Tag name of the source node.", - "(#{options.source_tag})") do |tag| - options.source_tag = tag - end - parser.on("--dataset=DATASET", - "Name of the source dataset.", - "(#{options.source_dataset})") do |dataset| - options.source_dataset = dataset - end + parser.separator("") + parser.separator("Source node:") + parser.on("--source-host=HOST", + "Host name of the source node.", + "(#{options.source_host})") do |host| + options.source_host = host + end + parser.on("--source-port=PORT", Integer, + "Port number of the source node.", + "(#{options.source_port})") do |host| + options.source_host = host + end + parser.on("--source-tag=TAG", + "Tag name of the source node.", + "(#{options.source_tag})") do |tag| + options.source_tag = tag + end + parser.on("--dataset=DATASET", + "Name of the source dataset.", + "(#{options.source_dataset})") do |dataset| + options.source_dataset = dataset + end - parser.separator("") - parser.separator("Connection:") - parser.on("--receiver-host=HOST", - "Host name of this computer.", - "(#{options.receiver_host})") do |host| - options.receiver_host = host - end + parser.separator("") + parser.separator("Connection:") + parser.on("--receiver-host=HOST", + "Host name of this computer.", + "(#{options.receiver_host})") do |host| + options.receiver_host = host + end - parser.separator("") - parser.separator("Miscellaneous:") - parser.on("--records-per-second=N", Integer, - "Maximum number of records per second to be absorbed.", - "'#{Client::RateLimiter::NO_LIMIT}' means no limit.", - "(#{options.messages_per_second})") do |n| - options.messages_per_second = n - end - parser.on("--progress-interval-seconds=N", Integer, - "Interval seconds to report progress.", - "(#{options.progress_interval_seconds})") do |n| - options.progress_interval_seconds = n - end - parser.on("--[no-]verbose", - "Output details for internal operations.", - "(#{options.verbose})") do |verbose| - options.verbose = verbose - end + parser.separator("") + parser.separator("Miscellaneous:") + parser.on("--records-per-second=N", Integer, + "Maximum number of records per second to be absorbed.", + "'#{Client::RateLimiter::NO_LIMIT}' means no limit.", + "(#{options.messages_per_second})") do |n| + options.messages_per_second = n + end + parser.on("--progress-interval-seconds=N", Integer, + "Interval seconds to report progress.", + "(#{options.progress_interval_seconds})") do |n| + options.progress_interval_seconds = n + end + parser.on("--[no-]verbose", + "Output details for internal operations.", + "(#{options.verbose})") do |verbose| + options.verbose = verbose + end - parser.separator("") - parser.separator("For backward compatibility:") - parser.on("--destination-host=HOST", - "Alias to \"--host\".") do |host| - options.host = host - end + parser.separator("") + parser.separator("For backward compatibility:") + parser.on("--destination-host=HOST", + "Alias to \"--host\".") do |host| + options.host = host + end - parser.parse!(ARGV) - @options = options - end + parser.parse!(ARGV) + @options = options + end - def assert_valid_options - unles****@optio*****_host - raise "You must specify the source host via --source-host option." + def assert_valid_options + unles****@optio*****_host + raise "You must specify the source host via --source-host option." + end + unles****@optio***** + raise "You must specify the destination host via --host option." + end end - unles****@optio***** - raise "You must specify the destination host via --host option." + + def source_node + @source_node ||= NodeName.new(:host => @options.source_host, + :port => @options.source_port, + :tag => @options.source_tag) end - end - def source_node - @source_node ||= NodeName.new(:host => @options.source_host, - :port => @options.source_port, - :tag => @options.source_tag) - end + def destination_node + @destination_node ||= NodeName.new(:host => @options.host, + :port => @options.port, + :tag => @options.tag) + end - def destination_node - @destination_node ||= NodeName.new(:host => @options.host, - :port => @options.port, - :tag => @options.tag) - end + def run_remote_command(target, command, options) + serf = Serf.new(target, :verbose => @options.verbose) + serf.send_query(command, options) + end - def run_remote_command(target, command, options) - serf = Serf.new(target, :verbose => @options.verbose) - serf.send_query(command, options) - end + def absorber + @absorber ||= prepare_absorber + end - def absorber - @absorber ||= prepare_absorber - end + def prepare_absorber + absorber_options = { + :host => @options.host, + :port => @options.port, + :tag => @options.tag, + :dataset => @options.dataset, + + :source_host => @options.source_host, + :source_port => @options.source_port, + :source_tag => @options.source_tag, + :source_dataset => @options.source_dataset, + + :receiver_host => @options.receiver_host, + + :messages_per_second => @options.messages_per_second, + :progress_interval_seconds => @options.progress_interval_seconds, + + :client_options => { + :backend => :coolio, + :loop => @loop, + }, + } + DataAbsorberClient.new(absorber_options) + end - def prepare_absorber - absorber_options = { - :host => @options.host, - :port => @options.port, - :tag => @options.tag, - :dataset => @options.dataset, - - :source_host => @options.source_host, - :source_port => @options.source_port, - :source_tag => @options.source_tag, - :source_dataset => @options.source_dataset, - - :receiver_host => @options.receiver_host, - - :messages_per_second => @options.messages_per_second, - :progress_interval_seconds => @options.progress_interval_seconds, - - :client_options => { - :backend => :coolio, - :loop => @loop, - }, - } - DataAbsorberClient.new(absorber_options) - end + def absorb + last_progress = nil + absorber.run do |progress| + if last_progress + printf("%s", "#{" " * last_progress[:message].size}\r") + end + printf("%s", "#{progress[:message]}\r") + last_progress = progress + end + @loop.run - def absorb - last_progress = nil - absorber.run do |progress| - if last_progress - printf("%s", "#{" " * last_progress[:message].size}\r") + if absorber.error_message + puts(absorber.error_message) + do_cancel + return false end - printf("%s", "#{progress[:message]}\r") - last_progress = progress - end - @loop.run - if absorber.error_message - puts(absorber.error_message) - do_cancel - return false + puts "" + + response = run_remote_command(source_node.to_s, "report_metadata", + "node" => source_node.to_s, + "key" => "last_processed_message_timestamp") + timestamp = response["value"] + if timestamp and not timestamp.empty? + puts "The timestamp of the last processed message in the source node: #{timestamp}" + puts "Setting effective message timestamp for the destination node..." + response = run_remote_command(destination_node.to_s, "accept_messages_newer_than", + "node" => destination_node.to_s, + "timestamp" => timestamp) + end + true end - puts "" - - response = run_remote_command(source_node.to_s, "report_metadata", - "node" => source_node.to_s, - "key" => "last_processed_message_timestamp") - timestamp = response["value"] - if timestamp and not timestamp.empty? - puts "The timestamp of the last processed message in the source node: #{timestamp}" - puts "Setting effective message timestamp for the destination node..." - response = run_remote_command(destination_node.to_s, "accept_messages_newer_than", - "node" => destination_node.to_s, - "timestamp" => timestamp) - end - true - end + def trap_signals + trap(:TERM) do + trap(:TERM, "DEFAULT") + do_cancel + end - def trap_signals - trap(:TERM) do - trap(:TERM, "DEFAULT") - do_cancel - end + trap(:INT) do + trap(:INT, "DEFAULT") + do_cancel + end - trap(:INT) do - trap(:INT, "DEFAULT") - do_cancel + trap(:QUIT) do + trap(:QUIT, "DEFAULT") + do_cancel + end end - trap(:QUIT) do - trap(:QUIT, "DEFAULT") - do_cancel + def do_cancel + #XXX we have to write more codes to cancel remote processes! end end - - def do_cancel - #XXX we have to write more codes to cancel remote processes! - end -end end Droonga::AbsorbDataCommand.new.run Modified: bin/droonga-engine-join (+258 -258) =================================================================== --- bin/droonga-engine-join 2015-04-15 14:56:31 +0900 (74b56b3) +++ bin/droonga-engine-join 2015-04-15 14:59:08 +0900 (7ecb8af) @@ -33,307 +33,307 @@ require "droonga/serf" require "droonga/node_metadata" module Droonga -class JoinCommand - def run - @loop = Coolio::Loop.default - - parse_options - trap_signals - - puts "Start to join a new node #{joining_node.host}" - puts " to the cluster of #{source_node.host}" - puts " via #{@options["receiver-host"]} (this host)" - puts " port = #{joining_node.port}" - puts " tag = #{joining_node.tag}" - puts " dataset = #{dataset}" - puts "" - puts "Source Cluster ID: #{source_cluster_id}" - puts "" - - begin - set_joining_node_role - do_join - register_to_existing_nodes - set_source_node_role - unless @options["no-copy"] - successed = copy_data - unless successed - do_cancel - exit(false) + class JoinCommand + def run + @loop = Coolio::Loop.default + + parse_options + trap_signals + + puts "Start to join a new node #{joining_node.host}" + puts " to the cluster of #{source_node.host}" + puts " via #{@options["receiver-host"]} (this host)" + puts " port = #{joining_node.port}" + puts " tag = #{joining_node.tag}" + puts " dataset = #{dataset}" + puts "" + puts "Source Cluster ID: #{source_cluster_id}" + puts "" + + begin + set_joining_node_role + do_join + register_to_existing_nodes + set_source_node_role + unless @options["no-copy"] + successed = copy_data + unless successed + do_cancel + exit(false) + end end + set_effective_message_timestamp + reset_source_node_role + reset_joining_node_role + puts("Done.") + rescue Exception => exception + puts("Unexpected exception: #{exception.message}") + puts(exception.backtrace.join("\n")) + do_cancel + exit(false) end - set_effective_message_timestamp - reset_source_node_role - reset_joining_node_role - puts("Done.") - rescue Exception => exception - puts("Unexpected exception: #{exception.message}") - puts(exception.backtrace.join("\n")) - do_cancel + exit(true) + end + + private + def parse_options + options = Slop.parse(:help => true) do |option| + option.on("no-copy", "Don't copy data from the source cluster.", + :default => false) + + option.separator("Target:") + option.on(:host=, + "Host name of the new node to be joined.", + :required => true) + option.on("replica-source-host=", + "Host name of the soruce node in the cluster to be connected.", + :required => true) + + option.on(:port=, + "Port number of the source cluster to be connected.", + :as => Integer, + :default => NodeName::DEFAULT_PORT) + option.on(:tag=, + "Tag name of the soruce cluster to be connected.", + :default => NodeName::DEFAULT_TAG) + option.on(:dataset=, + "Dataset name of for the node to be joined.", + :default => Catalog::Dataset::DEFAULT_NAME) + + option.separator("Connections:") + option.on("receiver-host=", + "Host name of this host.", + :default => Socket.gethostname) + + option.separator("Miscellaneous:") + option.on("records-per-second=", + "Maximum number of records per second to be copied. " + + "'#{Client::RateLimiter::NO_LIMIT}' means no limit.", + :as => Integer, + :default => DataAbsorberClient::DEFAULT_MESSAGES_PER_SECOND) + option.on("progress-interval-seconds=", + "Interval seconds to report progress.", + :as => Integer, + :default => DataAbsorberClient::DEFAULT_PROGRESS_INTERVAL_SECONDS) + option.on(:verbose, "Output details for internal operations.", + :default => false) + end + @options = options + rescue Slop::MissingOptionError => error + $stderr.puts(error) exit(false) end - exit(true) - end - private - def parse_options - options = Slop.parse(:help => true) do |option| - option.on("no-copy", "Don't copy data from the source cluster.", - :default => false) - - option.separator("Target:") - option.on(:host=, - "Host name of the new node to be joined.", - :required => true) - option.on("replica-source-host=", - "Host name of the soruce node in the cluster to be connected.", - :required => true) - - option.on(:port=, - "Port number of the source cluster to be connected.", - :as => Integer, - :default => NodeName::DEFAULT_PORT) - option.on(:tag=, - "Tag name of the soruce cluster to be connected.", - :default => NodeName::DEFAULT_TAG) - option.on(:dataset=, - "Dataset name of for the node to be joined.", - :default => Catalog::Dataset::DEFAULT_NAME) - - option.separator("Connections:") - option.on("receiver-host=", - "Host name of this host.", - :default => Socket.gethostname) - - option.separator("Miscellaneous:") - option.on("records-per-second=", - "Maximum number of records per second to be copied. " + - "'#{Client::RateLimiter::NO_LIMIT}' means no limit.", - :as => Integer, - :default => DataAbsorberClient::DEFAULT_MESSAGES_PER_SECOND) - option.on("progress-interval-seconds=", - "Interval seconds to report progress.", - :as => Integer, - :default => DataAbsorberClient::DEFAULT_PROGRESS_INTERVAL_SECONDS) - option.on(:verbose, "Output details for internal operations.", - :default => false) + def dataset + @options[:dataset] end - @options = options - rescue Slop::MissingOptionError => error - $stderr.puts(error) - exit(false) - end - def dataset - @options[:dataset] - end + def joining_node + @joining_node ||= NodeName.new(:host => @options[:host], + :port => @options[:port], + :tag => @options[:tag]) + end - def joining_node - @joining_node ||= NodeName.new(:host => @options[:host], - :port => @options[:port], - :tag => @options[:tag]) - end + def source_node + @source_node ||= NodeName.new(:host => @options["replica-source-host"], + :port => @options[:port], + :tag => @options[:tag]) + end - def source_node - @source_node ||= NodeName.new(:host => @options["replica-source-host"], - :port => @options[:port], - :tag => @options[:tag]) - end + def source_cluster_id + source_catalog.cluster_id + end - def source_cluster_id - source_catalog.cluster_id - end + def source_catalog + @source_catalog ||= parse_source_catalog + end - def source_catalog - @source_catalog ||= parse_source_catalog - end + def parse_source_catalog + loader = Catalog::Loader.new + loader.parse(raw_source_catalog) + end - def parse_source_catalog - loader = Catalog::Loader.new - loader.parse(raw_source_catalog) - end + def raw_source_catalog + @raw_source_catalog ||= fetch_source_catalog + end - def raw_source_catalog - @raw_source_catalog ||= fetch_source_catalog - end + def fetch_source_catalog + fetcher = Catalog::Fetcher.new(:host => source_node.host, + :port => source_node.port, + :tag => source_node.tag, + :receiver_host => @options["receiver-host"]) + fetcher.fetch(:dataset => dataset) + end - def fetch_source_catalog - fetcher = Catalog::Fetcher.new(:host => source_node.host, - :port => source_node.port, - :tag => source_node.tag, - :receiver_host => @options["receiver-host"]) - fetcher.fetch(:dataset => dataset) - end + def run_remote_command(target, command, options) + serf = Serf.new(target, :verbose => @options[:verbose]) + serf.send_query(command, options) + end - def run_remote_command(target, command, options) - serf = Serf.new(target, :verbose => @options[:verbose]) - serf.send_query(command, options) - end + def absorber + @absorber ||= prepare_absorber + end - def absorber - @absorber ||= prepare_absorber - end + def prepare_absorber + absorber_options = { + :host => joining_node.host, + :port => joining_node.port, + :tag => joining_node.tag, + :dataset => dataset, + + :source_host => source_node.host, + :source_port => source_node.port, + :source_tag => source_node.tag, + :source_dataset => dataset, + + :receiver_host => @options["receiver-host"], + + :messages_per_second => @options["records-per-second"], + :progress_interval_seconds => @options["progress-interval-seconds"], + + :client_options => { + :backend => :coolio, + :loop => @loop, + }, + } + DataAbsorberClient.new(absorber_options) + end - def prepare_absorber - absorber_options = { - :host => joining_node.host, - :port => joining_node.port, - :tag => joining_node.tag, - :dataset => dataset, - - :source_host => source_node.host, - :source_port => source_node.port, - :source_tag => source_node.tag, - :source_dataset => dataset, - - :receiver_host => @options["receiver-host"], - - :messages_per_second => @options["records-per-second"], - :progress_interval_seconds => @options["progress-interval-seconds"], - - :client_options => { - :backend => :coolio, - :loop => @loop, - }, - } - DataAbsorberClient.new(absorber_options) - end + def set_source_node_role + if absorber.source_node_suspendable? + puts("Changing role of the source node...") + run_remote_command(source_node.to_s, "change_role", + "node" => source_node.to_s, + "role" => NodeMetadata::Role::ABSORB_SOURCE) + wait_until_restarted(source_node) + end + @source_node_role_changed = true + end - def set_source_node_role - if absorber.source_node_suspendable? - puts("Changing role of the source node...") - run_remote_command(source_node.to_s, "change_role", - "node" => source_node.to_s, - "role" => NodeMetadata::Role::ABSORB_SOURCE) - wait_until_restarted(source_node) + def set_joining_node_role + puts("Changing role of the joining node...") + run_remote_command(joining_node.to_s, "change_role", + "node" => joining_node.to_s, + "role" => NodeMetadata::Role::ABSORB_DESTINATION) + wait_until_restarted(joining_node) + @joining_node_role_changed = true end - @source_node_role_changed = true - end - def set_joining_node_role - puts("Changing role of the joining node...") - run_remote_command(joining_node.to_s, "change_role", - "node" => joining_node.to_s, - "role" => NodeMetadata::Role::ABSORB_DESTINATION) - wait_until_restarted(joining_node) - @joining_node_role_changed = true - end + def reset_source_node_role + if absorber.source_node_suspendable? + puts("Restoring role of the source node...") + run_remote_command(source_node.to_s, "change_role", + "node" => source_node.to_s, + "role" => NodeMetadata::Role::SERVICE_PROVIDER) + wait_until_restarted(source_node.to_s) + end + @source_node_role_changed = false + end - def reset_source_node_role - if absorber.source_node_suspendable? - puts("Restoring role of the source node...") - run_remote_command(source_node.to_s, "change_role", - "node" => source_node.to_s, + def reset_joining_node_role + puts("Restoring role of the joining node...") + run_remote_command(joining_node.to_s, "change_role", + "node" => joining_node.to_s, "role" => NodeMetadata::Role::SERVICE_PROVIDER) - wait_until_restarted(source_node.to_s) + wait_until_restarted(joining_node.to_s) + @joining_node_role_changed = false end - @source_node_role_changed = false - end - def reset_joining_node_role - puts("Restoring role of the joining node...") - run_remote_command(joining_node.to_s, "change_role", - "node" => joining_node.to_s, - "role" => NodeMetadata::Role::SERVICE_PROVIDER) - wait_until_restarted(joining_node.to_s) - @joining_node_role_changed = false - end + def do_join + puts("Joining new replica to the cluster...") + run_remote_command(joining_node.to_s, "join", + "node" => joining_node.to_s, + "type" => "replica", + "source" => source_node.to_s, + "dataset" => dataset) + wait_until_restarted(joining_node) + end - def do_join - puts("Joining new replica to the cluster...") - run_remote_command(joining_node.to_s, "join", - "node" => joining_node.to_s, - "type" => "replica", - "source" => source_node.to_s, - "dataset" => dataset) - wait_until_restarted(joining_node) - end + def copy_data + puts("Copying data from the source node...") - def copy_data - puts("Copying data from the source node...") + last_progress = nil + absorber.run do |progress| + if last_progress + printf("%s", "#{" " * last_progress[:message].size}\r") + end + printf("%s", "#{progress[:message]}\r") + last_progress = progress + end + @loop.run - last_progress = nil - absorber.run do |progress| - if last_progress - printf("%s", "#{" " * last_progress[:message].size}\r") + if absorber.error_message + puts(absorber.error_message) + do_cancel + return false end - printf("%s", "#{progress[:message]}\r") - last_progress = progress + + puts "" end - @loop.run - if absorber.error_message - puts(absorber.error_message) - do_cancel - return false + def set_effective_message_timestamp + response = run_remote_command(source_node.to_s, "report_metadata", + "node" => source_node.to_s, + "key" => "last_processed_message_timestamp") + timestamp = response["value"] + if timestamp and not timestamp.empty? + puts "The timestamp of the last processed message in the source node: #{timestamp}" + puts "Setting effective message timestamp for the destination node..." + response = run_remote_command(joining_node.to_s, "accept_messages_newer_than", + "node" => joining_node.to_s, + "timestamp" => timestamp) + end end - puts "" - end + def register_to_existing_nodes + puts("Register new node to existing hosts in the cluster...") + run_remote_command(source_node.to_s, "add_replicas", + "cluster_id" => source_cluster_id, + "dataset" => dataset, + "hosts" => [joining_node.host]) + wait_until_restarted(source_node) + @node_registered = true + end - def set_effective_message_timestamp - response = run_remote_command(source_node.to_s, "report_metadata", - "node" => source_node.to_s, - "key" => "last_processed_message_timestamp") - timestamp = response["value"] - if timestamp and not timestamp.empty? - puts "The timestamp of the last processed message in the source node: #{timestamp}" - puts "Setting effective message timestamp for the destination node..." - response = run_remote_command(joining_node.to_s, "accept_messages_newer_than", - "node" => joining_node.to_s, - "timestamp" => timestamp) + def unregister_from_existing_nodes + puts("Unregister new node from existing hosts in the cluster...") + run_remote_command(source_node, "remove_replicas", + "cluster_id" => source_cluster_id, + "dataset" => dataset, + "hosts" => [joining_node.host]) + wait_until_restarted(source_node) + @node_registered = false end - end - def register_to_existing_nodes - puts("Register new node to existing hosts in the cluster...") - run_remote_command(source_node.to_s, "add_replicas", - "cluster_id" => source_cluster_id, - "dataset" => dataset, - "hosts" => [joining_node.host]) - wait_until_restarted(source_node) - @node_registered = true - end + def trap_signals + trap(:TERM) do + trap(:TERM, "DEFAULT") + do_cancel + end - def unregister_from_existing_nodes - puts("Unregister new node from existing hosts in the cluster...") - run_remote_command(source_node, "remove_replicas", - "cluster_id" => source_cluster_id, - "dataset" => dataset, - "hosts" => [joining_node.host]) - wait_until_restarted(source_node) - @node_registered = false - end + trap(:INT) do + trap(:INT, "DEFAULT") + do_cancel + end - def trap_signals - trap(:TERM) do - trap(:TERM, "DEFAULT") - do_cancel + trap(:QUIT) do + trap(:QUIT, "DEFAULT") + do_cancel + end end - trap(:INT) do - trap(:INT, "DEFAULT") - do_cancel + def do_cancel + #XXX we have to write more codes to cancel remote processes! + unregister_from_existing_nodes if @node_registered + reset_joining_node_role if @joining_node_role_changed + reset_source_node_role if @source_node_role_changed end - trap(:QUIT) do - trap(:QUIT, "DEFAULT") - do_cancel + def wait_until_restarted(*nodes) + #TODO: wait for restarting of the given nodes. this should be done more safely. + sleep(30) end end - - def do_cancel - #XXX we have to write more codes to cancel remote processes! - unregister_from_existing_nodes if @node_registered - reset_joining_node_role if @joining_node_role_changed - reset_source_node_role if @source_node_role_changed - end - - def wait_until_restarted(*nodes) - #TODO: wait for restarting of the given nodes. this should be done more safely. - sleep(30) - end -end end Droonga::JoinCommand.new.run Modified: bin/droonga-engine-set-role (+51 -51) =================================================================== --- bin/droonga-engine-set-role 2015-04-15 14:56:31 +0900 (dffa7bb) +++ bin/droonga-engine-set-role 2015-04-15 14:59:08 +0900 (3eee682) @@ -23,65 +23,65 @@ require "droonga/node_name" require "droonga/serf" module Droonga -class SetRoleCommand - def run - parse_options - puts "Setting role of #{@options[:host]} to #{@options[:role]}..." - set_node_role - puts("Done.") - exit(true) - end + class SetRoleCommand + def run + parse_options + puts "Setting role of #{@options[:host]} to #{@options[:role]}..." + set_node_role + puts("Done.") + exit(true) + end - private - def parse_options - options = Slop.parse(:help => true) do |option| - option.on(:role=, - "New role for the target node.", - :required => true) + private + def parse_options + options = Slop.parse(:help => true) do |option| + option.on(:role=, + "New role for the target node.", + :required => true) - option.separator("Connections:") - option.on(:host=, - "Host name of the target node.", - :required => true) - option.on("receiver-host=", - "Host name of this host.", - :default => Socket.gethostname) - option.on(:dataset=, - "Dataset name of for the target node.", - :default => NodeName::DEFAULT_DATASET) - option.on(:port=, - "Port number of the source cluster to be connected.", - :as => Integer, - :default => NodeName::DEFAULT_PORT) - option.on(:tag=, - "Tag name of the soruce cluster to be connected.", - :default => NodeName::DEFAULT_TAG) + option.separator("Connections:") + option.on(:host=, + "Host name of the target node.", + :required => true) + option.on("receiver-host=", + "Host name of this host.", + :default => Socket.gethostname) + option.on(:dataset=, + "Dataset name of for the target node.", + :default => NodeName::DEFAULT_DATASET) + option.on(:port=, + "Port number of the source cluster to be connected.", + :as => Integer, + :default => NodeName::DEFAULT_PORT) + option.on(:tag=, + "Tag name of the soruce cluster to be connected.", + :default => NodeName::DEFAULT_TAG) - option.separator("Miscellaneous:") - option.on(:verbose, "Output details for internal operations.", - :default => false) + option.separator("Miscellaneous:") + option.on(:verbose, "Output details for internal operations.", + :default => false) + end + @options = options + rescue Slop::MissingOptionError => error + $stderr.puts(error) + exit(false) end - @options = options - rescue Slop::MissingOptionError => error - $stderr.puts(error) - exit(false) - end - def target_node - "#{@options[:host]}:#{@options[:port]}/#{@options[:tag]}" - end + def target_node + "#{@options[:host]}:#{@options[:port]}/#{@options[:tag]}" + end - def run_remote_command(target, command, options) - serf = Serf.new(target, :verbose => @options[:verbose]) - serf.send_query(command, options) - end + def run_remote_command(target, command, options) + serf = Serf.new(target, :verbose => @options[:verbose]) + serf.send_query(command, options) + end - def set_node_role - run_remote_command(target_node, "change_role", - "node" => target_node, - "role" => @options[:role]) + def set_node_role + run_remote_command(target_node, "change_role", + "node" => target_node, + "role" => @options[:role]) + end end end -end Droonga::SetRoleCommand.new.run Modified: bin/droonga-engine-unjoin (+101 -101) =================================================================== --- bin/droonga-engine-unjoin 2015-04-15 14:56:31 +0900 (95f86f8) +++ bin/droonga-engine-unjoin 2015-04-15 14:59:08 +0900 (dd44573) @@ -28,130 +28,130 @@ require "droonga/catalog/loader" require "droonga/serf" module Droonga -class UnjoinCommand - def run - parse_options + class UnjoinCommand + def run + parse_options - puts "Start to unjoin a node #{@options[:host]}" - puts " by #{@options["receiver-host"]} (this host)" - puts "" + puts "Start to unjoin a node #{@options[:host]}" + puts " by #{@options["receiver-host"]} (this host)" + puts "" - do_unjoin + do_unjoin - puts("Done.") - exit(true) - end + puts("Done.") + exit(true) + end - private - def parse_options - options = Slop.parse(:help => true) do |option| - option.on(:host=, - "Host name of the replica removed from cluster.", - :required => true) - option.on("receiver-host=", - "Host name of this host.", - :default => Socket.gethostname) - option.on(:dataset=, - "Dataset name of for the node to be unjoined.", - :default => NodeName::DEFAULT_DATASET) - option.on(:port=, - "Port number of the source cluster to be connected.", - :as => Integer, - :default => NodeName::DEFAULT_PORT) - option.on(:tag=, - "Tag name of the soruce cluster to be connected.", - :default => NodeName::DEFAULT_TAG) - - option.separator("Miscellaneous:") - option.on(:verbose, "Output details for internal operations.", - :default => false) - end - @options = options - rescue Slop::MissingOptionError => error - $stderr.puts(error) - exit(false) - end + private + def parse_options + options = Slop.parse(:help => true) do |option| + option.on(:host=, + "Host name of the replica removed from cluster.", + :required => true) + option.on("receiver-host=", + "Host name of this host.", + :default => Socket.gethostname) + option.on(:dataset=, + "Dataset name of for the node to be unjoined.", + :default => NodeName::DEFAULT_DATASET) + option.on(:port=, + "Port number of the source cluster to be connected.", + :as => Integer, + :default => NodeName::DEFAULT_PORT) + option.on(:tag=, + "Tag name of the soruce cluster to be connected.", + :default => NodeName::DEFAULT_TAG) + + option.separator("Miscellaneous:") + option.on(:verbose, "Output details for internal operations.", + :default => false) + end + @options = options + rescue Slop::MissingOptionError => error + $stderr.puts(error) + exit(false) + end - def replica_remove_host - @options[:host] - end + def replica_remove_host + @options[:host] + end - def tag - @options[:tag] - end + def tag + @options[:tag] + end - def port - @options[:port] - end + def port + @options[:port] + end - def dataset_name - @options[:dataset] - end + def dataset_name + @options[:dataset] + end - def replica_remove_node - "#{replica_remove_host}:#{port}/#{tag}" - end + def replica_remove_node + "#{replica_remove_host}:#{port}/#{tag}" + end - def cluster_id - catalog.cluster_id - end + def cluster_id + catalog.cluster_id + end - def catalog - @catalog ||= parse_catalog - end + def catalog + @catalog ||= parse_catalog + end - def parse_catalog - loader = Catalog::Loader.new - loader.parse(raw_catalog) - end + def parse_catalog + loader = Catalog::Loader.new + loader.parse(raw_catalog) + end - def raw_catalog - @raw_catalog ||= fetch_catalog - end + def raw_catalog + @raw_catalog ||= fetch_catalog + end - def fetch_catalog - fetcher = Catalog::Fetcher.new(:host => replica_remove_host, - :port => port, - :tag => tag, - :receiver_host => @options["receiver-host"]) - fetcher.fetch(:dataset => dataset_name) - end + def fetch_catalog + fetcher = Catalog::Fetcher.new(:host => replica_remove_host, + :port => port, + :tag => tag, + :receiver_host => @options["receiver-host"]) + fetcher.fetch(:dataset => dataset_name) + end - def remaining_node - @remaining_node ||= detect_remaining_node - end + def remaining_node + @remaining_node ||= detect_remaining_node + end - def detect_remaining_node - generator = Catalog::Generator.new - generator.load(raw_catalog) + def detect_remaining_node + generator = Catalog::Generator.new + generator.load(raw_catalog) - dataset = generator.dataset_for_host(replica_remove_host) - unless dataset - raise "Specified host #{replica_remove_host} is not a member of "+ - "the cluster. You must specify correct host via --replica-remove-host " + - "option." - end + dataset = generator.dataset_for_host(replica_remove_host) + unless dataset + raise "Specified host #{replica_remove_host} is not a member of "+ + "the cluster. You must specify correct host via --replica-remove-host " + + "option." + end - other_hosts = dataset.replicas.hosts + other_hosts = dataset.replicas.hosts - remaining_host = other_hosts.first || replica_remove_host - "#{remaining_host}:#{port}/#{tag}" - end + remaining_host = other_hosts.first || replica_remove_host + "#{remaining_host}:#{port}/#{tag}" + end - def run_remote_command(target, command, options) - serf = Serf.new(target, :verbose => @options[:verbose]) - serf.send_query(command, options) - end + def run_remote_command(target, command, options) + serf = Serf.new(target, :verbose => @options[:verbose]) + serf.send_query(command, options) + end - def do_unjoin - puts "Unjoining replica from the cluster..." + def do_unjoin + puts "Unjoining replica from the cluster..." - run_remote_command(remaining_node, "unjoin", - "cluster_id" => cluster_id, - "dataset" => dataset_name, - "hosts" => [replica_remove_host]) + run_remote_command(remaining_node, "unjoin", + "cluster_id" => cluster_id, + "dataset" => dataset_name, + "hosts" => [replica_remove_host]) + end end end -end Droonga::UnjoinCommand.new.run -------------- next part -------------- HTML����������������������������... Download