YUKI Hiroshi
null+****@clear*****
Fri Apr 10 17:58:53 JST 2015
YUKI Hiroshi 2015-04-10 17:58:53 +0900 (Fri, 10 Apr 2015) New Revision: 4e7c62260f8a8e727693c526c584a6f27d0f014e https://github.com/droonga/droonga-engine/commit/4e7c62260f8a8e727693c526c584a6f27d0f014e Message: Use Droonga::DataAbsorber directly Modified files: bin/droonga-engine-absorb-data bin/droonga-engine-join Modified: bin/droonga-engine-absorb-data (+24 -34) =================================================================== --- bin/droonga-engine-absorb-data 2015-04-10 17:58:03 +0900 (c39dc84) +++ bin/droonga-engine-absorb-data 2015-04-10 17:58:53 +0900 (20e36d3) @@ -17,8 +17,8 @@ require "ostruct" require "optparse" -require "open3" require "socket" +require "coolio" require "droonga/engine/version" require "droonga/catalog_generator" @@ -31,6 +31,8 @@ require "droonga/restarter" class AbsorbDataCommand def run + @loop = Coolio::Loop.default + parse_options assert_valid_options trap_signals @@ -44,10 +46,10 @@ class AbsorbDataCommand puts "" puts "Absorbing..." - absorb + succeeded = absorb - puts "Done." - exit(true) + puts "Done." if succeeded + exit(succeeded) end private @@ -136,13 +138,6 @@ class AbsorbDataCommand serf.send_query(command, options) end - def current_member_state(target) - serf = Droonga::Serf.new(target, :verbose => @options.verbose) - serf.current_members.find do |member| - member["name"] == target - end - end - def absorber @absorber ||= prepare_absorber end @@ -156,37 +151,31 @@ class AbsorbDataCommand :port => @options.port, :tag => @options.tag, :messages_per_second => @options.messages_per_second, + :client_options => { + :backend => :coolio, + :loop => @loop, + }, } Droonga::DataAbsorber.new(absorber_options) end def absorb - start_time_in_seconds = Time.new.to_i - run_remote_command(destination_node, "absorb_data", - "node" => destination_node, - "source" => source_node, - "port" => @options.port, - "tag" => @options.tag, - "dataset" => @options.dataset, - "messages_per_second" => @options.messages_per_second) - last_progress = "" - while true - sleep(3) - state = current_member_state(destination_node) - if state.nil? or state["tags"]["absorbing"] != "true" - break + last_progress = nil + absorber.run do |progress| + if last_progress + printf("%s", "#{" " * last_progress[:message].size}\r") end + printf("%s", "#{progress[:message]}\r") + last_progress = progress + end + @loop.run - progress = absorber.report_progress(start_time_in_seconds) - if progress - printf("%s", "#{" " * last_progress.size}\r") - printf("%s", "#{progress}\r") - last_progress = progress - end + if absorber.error_message + puts(absorber.error_message) + do_cancel + return false end - printf("%s", "#{" " * last_progress.size}\r") - printf("%s", "#{progress}\r") - puts "100% done" + puts "" response = run_remote_command(source_node, "report_metadata", @@ -200,6 +189,7 @@ class AbsorbDataCommand "node" => destination_node, "timestamp" => timestamp) end + true end def trap_signals Modified: bin/droonga-engine-join (+28 -33) =================================================================== --- bin/droonga-engine-join 2015-04-10 17:58:03 +0900 (c64e849) +++ bin/droonga-engine-join 2015-04-10 17:58:53 +0900 (2698453) @@ -19,6 +19,7 @@ require "slop" require "json" require "pathname" require "socket" +require "coolio" require "droonga/engine/version" require "droonga/path" @@ -32,6 +33,8 @@ require "droonga/node_metadata" class JoinCommand def run + @loop = Coolio::Loop.default + parse_options trap_signals @@ -47,7 +50,13 @@ class JoinCommand do_join register_to_existing_nodes set_source_node_role - copy_data unless @options["no-copy"] + unless @options["no-copy"] + successed = copy_data + unless successed + do_cancel + exit(false) + end + end set_effective_message_timestamp reset_source_node_role reset_joining_node_role @@ -141,13 +150,6 @@ class JoinCommand serf.send_query(command, options) end - def current_member_state(target) - serf = Droonga::Serf.new(target, :verbose => @options[:verbose]) - serf.current_members.find do |member| - member["name"] == target - end - end - def absorber @absorber ||= prepare_absorber end @@ -161,6 +163,10 @@ class JoinCommand :port => @options[:port], :tag => @options[:tag], :messages_per_second => @options["records-per-second"], + :client_options => { + :backend => :coolio, + :loop => @loop, + }, } Droonga::DataAbsorber.new(absorber_options) end @@ -218,33 +224,22 @@ class JoinCommand def copy_data puts("Copying data from the source node...") - @start_time_in_seconds = Time.new.to_i - - run_remote_command(joining_node, "absorb_data", - "node" => joining_node, - "source" => source_node, - "port" => absorber.port, - "tag" => absorber.tag, - "dataset" => absorber.dataset, - "messages_per_second" => absorber.messages_per_second) - - last_progress = "" - while true - sleep(3) - state = current_member_state(joining_node) - if state.nil? or state["tags"]["absorbing"] != "true" - break - end - progress = absorber.report_progress(@start_time_in_seconds) - if progress - printf("%s", "#{" " * last_progress.size}\r") - printf("%s", "#{progress}\r") - last_progress = progress + last_progress = nil + absorber.run do |progress| + if last_progress + printf("%s", "#{" " * last_progress[:message].size}\r") end + printf("%s", "#{progress[:message]}\r") + last_progress = progress end - printf("%s", "#{" " * last_progress.size}\r") - printf("%s", "#{progress}\r") - puts "100% done" + @loop.run + + if absorber.error_message + puts(absorber.error_message) + do_cancel + return false + end + puts "" end -------------- next part -------------- HTML����������������������������... Download