[Groonga-commit] droonga/droonga-engine at 762fbc8 [master] Run Serf automatically

Back to archive index

Kouhei Sutou null+****@clear*****
Wed May 21 16:30:15 JST 2014


Kouhei Sutou	2014-05-21 16:30:15 +0900 (Wed, 21 May 2014)

  New Revision: 762fbc805201122fbd6ebd6103b5d3e5a9e2d52d
  https://github.com/droonga/droonga-engine/commit/762fbc805201122fbd6ebd6103b5d3e5a9e2d52d

  Message:
    Run Serf automatically

  Added files:
    lib/droonga/serf.rb
  Modified files:
    lib/droonga/command/droonga_engine.rb
    lib/droonga/live_nodes_list_loader.rb
    lib/droonga/serf_event_handler.rb

  Modified: lib/droonga/command/droonga_engine.rb (+15 -3)
===================================================================
--- lib/droonga/command/droonga_engine.rb    2014-05-21 16:29:19 +0900 (1cb33c8)
+++ lib/droonga/command/droonga_engine.rb    2014-05-21 16:30:15 +0900 (2f21010)
@@ -21,6 +21,7 @@ require "pathname"
 
 require "droonga/base_path"
 require "droonga/engine"
+require "droonga/serf"
 require "droonga/event_loop"
 require "droonga/fluent_message_receiver"
 require "droonga/plugin_loader"
@@ -183,19 +184,29 @@ module Droonga
           service_runner
         end
 
+        def run_serf(loop)
+          serf = Serf.new(loop, @configuration.engine_name)
+          serf.start
+          serf
+        end
+
         def run_main_loop
           raw_loop = Coolio::Loop.default
 
+          serf = nil
           service_runner = nil
           trap(:INT) do
-            service_runner.stop_immedieate
+            serf.shutdown if serf
+            service_runner.stop_immedieate if service_runner
             raw_loop.stop
           end
           trap(Signals::GRACEFUL_STOP) do
-            service_runner.stop_graceful
+            serf.shutdown if serf
+            service_runner.stop_graceful if service_runner
           end
           trap(Signals::IMMEDIATE_STOP) do
-            service_runner.stop_immediate
+            serf.shutdown if serf
+            service_runner.stop_immediate if service_runner
             raw_loop.stop
           end
           trap(Signals::GRACEFUL_RESTART) do
@@ -211,6 +222,7 @@ module Droonga
             old_service_runner.stop_immediate
           end
 
+          serf = run_serf(raw_loop)
           service_runner = run_service(raw_loop)
           raw_loop.run
 

  Modified: lib/droonga/live_nodes_list_loader.rb (+3 -1)
===================================================================
--- lib/droonga/live_nodes_list_loader.rb    2014-05-21 16:29:19 +0900 (9cd36be)
+++ lib/droonga/live_nodes_list_loader.rb    2014-05-21 16:30:15 +0900 (bdef5de)
@@ -25,7 +25,9 @@ module Droonga
     def load
       list_file = Pathname(@file_path)
       list = parse_list_file(list_file)
-      list.keys
+      list.collect do |key, value|
+        value["address"]
+      end
     end
 
     private

  Added: lib/droonga/serf.rb (+136 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/serf.rb    2014-05-21 16:30:15 +0900 (9f161a8)
@@ -0,0 +1,136 @@
+# Copyright (C) 2014 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+
+require "droonga/base_path"
+require "droonga/loggable"
+require "droonga/catalog_observer"
+require "droonga/serf_downloader"
+
+module Droonga
+  class Serf
+    class << self
+      def path
+        @path ||= Droonga.base_path + "serf"
+      end
+    end
+
+    include Loggable
+
+    def initialize(loop, name)
+      @loop = loop
+      @name = name
+    end
+
+    def start
+      logger.trace("start: start")
+      ensure_serf
+      ENV["SERF"] = self.class.path.to_s
+      ENV["SERF_RPC_ADDRESS"] = rpc_address
+      @serf_pid = run("agent",
+                      "-node", @name,
+                      "-bind", extract_host(@name),
+                      "-event-handler", "#{$0}-serf-event-handler")
+      start_join
+      logger.trace("start: done")
+    end
+
+    def shutdown
+      logger.trace("shutdown: start")
+      shutdown_join
+      Process.waitpid(run("leave"))
+      Process.waitpid(@serf_pid)
+      logger.trace("shutdown: done")
+    end
+
+    private
+    def ensure_serf
+      serf_path = self.class.path
+      return if serf_path.executable?
+      downloader = SerfDownloader.new(serf_path)
+      downloader.download
+    end
+
+    def run(command, *options)
+      spawn(self.class.path.to_s, command, "-rpc-addr", rpc_address, *options)
+    end
+
+    def extract_host(node_name)
+      node_name.split(":").first
+    end
+
+    def rpc_address
+      "#{extract_host(@name)}:7373"
+    end
+
+    def detect_other_hosts
+      catalog_observer = Droonga::CatalogObserver.new(@loop)
+      catalog = catalog_observer.catalog
+      other_nodes = catalog.all_nodes.reject do |node|
+        node == @name
+      end
+      other_nodes.collect do |node|
+        extract_host(node)
+      end
+    end
+
+    def try_join
+      logger.trace("join: start")
+
+      if @serf_join_pid
+        _, status = Process.waitpid2(@serf_join_pid, Process::WNOHANG)
+        if status
+          @serf_join_pid = nil
+          if status.success?
+            detach_join_timer
+            return
+          end
+        end
+      end
+
+      return if @serf_join_pid
+
+      @serf_join_pid = run("join", @other_hosts[@join_host_index])
+      @join_host_index = (@join_host_index + 1) % @other_hosts.size
+    end
+
+    def detach_join_timer
+      @join_timer.detach
+      @join_timer = nil
+    end
+
+    def start_join
+      @other_hosts = detect_other_hosts
+      @join_host_index = 0
+
+      @join_timer = Coolio::TimerWatcher.new(1, true)
+      @serf_join_pid = nil
+      on_timer = lambda do
+        try_join
+      end
+      @join_timer.on_timer do
+        on_timer.call
+      end
+      @loop.attach(@join_timer)
+    end
+
+    def shutdown_join
+      detach_join_timer if @join_timer
+    end
+
+    def log_tag
+      "serf"
+    end
+  end
+end

  Modified: lib/droonga/serf_event_handler.rb (+2 -1)
===================================================================
--- lib/droonga/serf_event_handler.rb    2014-05-21 16:29:19 +0900 (4c17eb4)
+++ lib/droonga/serf_event_handler.rb    2014-05-21 16:30:15 +0900 (ff9d7ee)
@@ -19,6 +19,7 @@ require "json"
 require "fileutils"
 
 require "droonga/base_path"
+require "droonga/serf"
 require "droonga/live_nodes_list_observer"
 
 module Droonga
@@ -30,7 +31,7 @@ module Droonga
     end
 
     def initialize
-      @serf = ENV["SERF"] || "serf"
+      @serf = ENV["SERF"] || Serf.path
       @serf_rpc_address = ENV["SERF_RPC_ADDRESS"] || "127.0.0.1:7373"
     end
 
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index