[Groonga-commit] droonga/droonga-engine at 4e7c622 [master] Use Droonga::DataAbsorber directly

Back to archive index

YUKI Hiroshi null+****@clear*****
Fri Apr 10 17:58:53 JST 2015


YUKI Hiroshi	2015-04-10 17:58:53 +0900 (Fri, 10 Apr 2015)

  New Revision: 4e7c62260f8a8e727693c526c584a6f27d0f014e
  https://github.com/droonga/droonga-engine/commit/4e7c62260f8a8e727693c526c584a6f27d0f014e

  Message:
    Use Droonga::DataAbsorber directly

  Modified files:
    bin/droonga-engine-absorb-data
    bin/droonga-engine-join

  Modified: bin/droonga-engine-absorb-data (+24 -34)
===================================================================
--- bin/droonga-engine-absorb-data    2015-04-10 17:58:03 +0900 (c39dc84)
+++ bin/droonga-engine-absorb-data    2015-04-10 17:58:53 +0900 (20e36d3)
@@ -17,8 +17,8 @@
 
 require "ostruct"
 require "optparse"
-require "open3"
 require "socket"
+require "coolio"
 
 require "droonga/engine/version"
 require "droonga/catalog_generator"
@@ -31,6 +31,8 @@ require "droonga/restarter"
 
 class AbsorbDataCommand
   def run
+    @loop = Coolio::Loop.default
+
     parse_options
     assert_valid_options
     trap_signals
@@ -44,10 +46,10 @@ class AbsorbDataCommand
     puts ""
     puts "Absorbing..."
 
-    absorb
+    succeeded = absorb
 
-    puts "Done."
-    exit(true)
+    puts "Done." if succeeded
+    exit(succeeded)
   end
 
   private
@@ -136,13 +138,6 @@ class AbsorbDataCommand
     serf.send_query(command, options)
   end
 
-  def current_member_state(target)
-    serf = Droonga::Serf.new(target, :verbose => @options.verbose)
-    serf.current_members.find do |member|
-      member["name"] == target
-    end
-  end
-
   def absorber
     @absorber ||= prepare_absorber
   end
@@ -156,37 +151,31 @@ class AbsorbDataCommand
       :port             => @options.port,
       :tag              => @options.tag,
       :messages_per_second => @options.messages_per_second,
+      :client_options   => {
+        :backend => :coolio,
+        :loop    => @loop,
+      },
     }
     Droonga::DataAbsorber.new(absorber_options)
   end
 
   def absorb
-    start_time_in_seconds = Time.new.to_i
-    run_remote_command(destination_node, "absorb_data",
-                       "node"    => destination_node,
-                       "source"  => source_node,
-                       "port"    => @options.port,
-                       "tag"     => @options.tag,
-                       "dataset" => @options.dataset,
-                       "messages_per_second" => @options.messages_per_second)
-    last_progress = ""
-    while true
-      sleep(3)
-      state = current_member_state(destination_node)
-      if state.nil? or state["tags"]["absorbing"] != "true"
-        break
+    last_progress = nil
+    absorber.run do |progress|
+      if last_progress
+        printf("%s", "#{" " * last_progress[:message].size}\r")
       end
+      printf("%s", "#{progress[:message]}\r")
+      last_progress = progress
+    end
+    @loop.run
 
-      progress = absorber.report_progress(start_time_in_seconds)
-      if progress
-        printf("%s", "#{" " * last_progress.size}\r")
-        printf("%s", "#{progress}\r")
-        last_progress = progress
-      end
+    if absorber.error_message
+      puts(absorber.error_message)
+      do_cancel
+      return false
     end
-    printf("%s", "#{" " * last_progress.size}\r")
-    printf("%s", "#{progress}\r")
-    puts "100% done"
+
     puts ""
 
     response = run_remote_command(source_node, "report_metadata",
@@ -200,6 +189,7 @@ class AbsorbDataCommand
                                     "node" => destination_node,
                                     "timestamp" => timestamp)
     end
+    true
   end
 
   def trap_signals

  Modified: bin/droonga-engine-join (+28 -33)
===================================================================
--- bin/droonga-engine-join    2015-04-10 17:58:03 +0900 (c64e849)
+++ bin/droonga-engine-join    2015-04-10 17:58:53 +0900 (2698453)
@@ -19,6 +19,7 @@ require "slop"
 require "json"
 require "pathname"
 require "socket"
+require "coolio"
 
 require "droonga/engine/version"
 require "droonga/path"
@@ -32,6 +33,8 @@ require "droonga/node_metadata"
 
 class JoinCommand
   def run
+    @loop = Coolio::Loop.default
+
     parse_options
     trap_signals
 
@@ -47,7 +50,13 @@ class JoinCommand
       do_join
       register_to_existing_nodes
       set_source_node_role
-      copy_data unless @options["no-copy"]
+      unless @options["no-copy"]
+        successed = copy_data
+        unless successed
+          do_cancel
+          exit(false)
+        end
+      end
       set_effective_message_timestamp
       reset_source_node_role
       reset_joining_node_role
@@ -141,13 +150,6 @@ class JoinCommand
     serf.send_query(command, options)
   end
 
-  def current_member_state(target)
-    serf = Droonga::Serf.new(target, :verbose => @options[:verbose])
-    serf.current_members.find do |member|
-      member["name"] == target
-    end
-  end
-
   def absorber
     @absorber ||= prepare_absorber
   end
@@ -161,6 +163,10 @@ class JoinCommand
       :port             => @options[:port],
       :tag              => @options[:tag],
       :messages_per_second => @options["records-per-second"],
+      :client_options   => {
+        :backend => :coolio,
+        :loop    => @loop,
+      },
     }
     Droonga::DataAbsorber.new(absorber_options)
   end
@@ -218,33 +224,22 @@ class JoinCommand
   def copy_data
     puts("Copying data from the source node...")
 
-    @start_time_in_seconds = Time.new.to_i
-
-    run_remote_command(joining_node, "absorb_data",
-                       "node"    => joining_node,
-                       "source"  => source_node,
-                       "port"    => absorber.port,
-                       "tag"     => absorber.tag,
-                       "dataset" => absorber.dataset,
-                       "messages_per_second" => absorber.messages_per_second)
-
-    last_progress = ""
-    while true
-      sleep(3)
-      state = current_member_state(joining_node)
-      if state.nil? or state["tags"]["absorbing"] != "true"
-        break
-      end
-      progress = absorber.report_progress(@start_time_in_seconds)
-      if progress
-        printf("%s", "#{" " * last_progress.size}\r")
-        printf("%s", "#{progress}\r")
-        last_progress = progress
+    last_progress = nil
+    absorber.run do |progress|
+      if last_progress
+        printf("%s", "#{" " * last_progress[:message].size}\r")
       end
+      printf("%s", "#{progress[:message]}\r")
+      last_progress = progress
     end
-    printf("%s", "#{" " * last_progress.size}\r")
-    printf("%s", "#{progress}\r")
-    puts "100% done"
+    @loop.run
+
+    if absorber.error_message
+      puts(absorber.error_message)
+      do_cancel
+      return false
+    end
+
     puts ""
   end
 
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index