[Groonga-commit] droonga/droonga-engine at cb268cc [master] Use event loop based main loop in supervisor

Back to archive index

Kouhei Sutou null+****@clear*****
Tue May 20 19:17:23 JST 2014


Kouhei Sutou	2014-05-20 19:17:23 +0900 (Tue, 20 May 2014)

  New Revision: cb268cc1ae361f8943e6fc875aee424a51c9c941
  https://github.com/droonga/droonga-engine/commit/cb268cc1ae361f8943e6fc875aee424a51c9c941

  Message:
    Use event loop based main loop in supervisor

  Modified files:
    lib/droonga/command/droonga_engine.rb

  Modified: lib/droonga/command/droonga_engine.rb (+166 -97)
===================================================================
--- lib/droonga/command/droonga_engine.rb    2014-05-20 16:36:23 +0900 (dac0fc9)
+++ lib/droonga/command/droonga_engine.rb    2014-05-20 19:17:23 +0900 (1cb33c8)
@@ -32,7 +32,7 @@ module Droonga
         include ServerEngine::Daemon::Signals
       end
 
-      class Configuration
+      class SupervisorConfiguration
         DEFAULT_HOST = Socket.gethostname
         DEFAULT_PORT = 10031
 
@@ -65,10 +65,7 @@ module Droonga
 
         def to_command_line
           command_line_options = [
-            "--host", @host,
-            "--port", @port.to_s,
-            "--tag", @tag,
-            "--log-level", log_level,
+            "--engine-name", engine_name,
           ]
           command_line_options
         end
@@ -79,6 +76,14 @@ module Droonga
           add_process_options(parser)
         end
 
+        def listen_socket
+          @listen_socket ||= TCPServer.new(@host, @port)
+        end
+
+        def heartbeat_socket
+          @heartbeat_socket ||= bind_heartbeat_socket
+        end
+
         private
         def add_connection_options(parser)
           parser.separator("")
@@ -129,6 +134,12 @@ module Droonga
             @pid_file = file
           end
         end
+
+        def bind_heartbeat_socket
+          socket = UDPSocket.new(address_family)
+          socket.bind(@host, @port)
+          socket
+        end
       end
 
       class Supervisor
@@ -139,19 +150,13 @@ module Droonga
         end
 
         def initialize
-          @configuration = Configuration.new
+          @configuration = SupervisorConfiguration.new
           @log_output = nil
         end
 
         def run(command_line_arguments)
           parse_command_line_arguments!(command_line_arguments)
 
-          @listen_socket = TCPServer.new(@configuration.host,
-                                         @configuration.port)
-          @heartbeat_socket = UDPSocket.new(@configuration.address_family)
-          @heartbeat_socket.bind(@configuration.host,
-                                 @configuration.port)
-
           ENV[Droonga::BASE_DIR_ENV_NAME] ||= Dir.pwd
 
           if****@confi*****?
@@ -172,80 +177,44 @@ module Droonga
           parser.parse!(command_line_arguments)
         end
 
-        def run_service(ready_notify_fd=nil)
-          listen_fd = @listen_socket.fileno
-          heartbeat_fd = @heartbeat_socket.fileno
-          env = {}
-          command_line = [
-            RbConfig.ruby,
-            "-S",
-            "#{$0}-service",
-            "--listen-fd", listen_fd.to_s,
-            "--heartbeat-fd", heartbeat_fd.to_s,
-            *@configuration.to_command_line
-          ]
-          options = {
-            listen_fd => listen_fd,
-            heartbeat_fd => heartbeat_fd,
-          }
-          if ready_notify_fd
-            command_line.push("--ready-notify-fd", ready_notify_fd.to_s)
-            options[ready_notify_fd] = ready_notify_fd
-          end
-          if @log_output
-            options[:out] = @log_output
-            options[:err] = @log_output
-          end
-          spawn(env, *command_line, options)
+        def run_service(loop)
+          service_runner = ServiceRunner.new(loop, @configuration)
+          service_runner.run
+          service_runner
         end
 
         def run_main_loop
-          service_pid = nil
-          running = true
+          raw_loop = Coolio::Loop.default
 
+          service_runner = nil
           trap(:INT) do
-            Process.kill(:INT, service_pid)
-            running = false
+            service_runner.stop_immedieate
+            raw_loop.stop
           end
           trap(Signals::GRACEFUL_STOP) do
-            Process.kill(Signals::GRACEFUL_STOP, service_pid)
-            running = false
+            service_runner.stop_graceful
           end
           trap(Signals::IMMEDIATE_STOP) do
-            Process.kill(Signals::IMMEDIATE_STOP, service_pid)
-            running = false
+            service_runner.stop_immediate
+            raw_loop.stop
           end
           trap(Signals::GRACEFUL_RESTART) do
-            old_service_pid = service_pid
-            IO.pipe do |ready_notify_read_io, ready_notify_write_io|
-              service_pid = run_service(ready_notify_write_io.fileno)
-              ready_notify_write_io.close
-              IO.select([ready_notify_read_io])
-              Process.kill(Signals::GRACEFUL_STOP, old_service_pid)
+            old_service_runner = service_runner
+            service_runner = run_service(raw_loop)
+            service_runner.on_ready = lambda do
+              old_service_runner.stop_graceful
             end
           end
           trap(Signals::IMMEDIATE_RESTART) do
-            old_service_pid = service_pid
-            service_pid = run_service
-            Process.kill(Signals::IMMEDIATE_STOP, old_service_pid)
-          end
-
-          succeeded = true
-          while running
-            service_pid ||= run_service
-            finished_pid, status = Process.waitpid2(service_pid)
-            service_pid = nil if service_pid == finished_pid
-            if status.nil?
-              succeeded = false
-              break
-            end
-            unless status.success?
-              succeeded = false
-              break
-            end
+            old_service_runner = service_runner
+            service_runner = run_service(raw_loop)
+            old_service_runner.stop_immediate
           end
 
-          succeeded
+          service_runner = run_service(raw_loop)
+          raw_loop.run
+
+          service_runner.success?
         end
 
         def open_log_file
@@ -275,6 +244,102 @@ module Droonga
         end
       end
 
+      class ServiceRunner
+        def initialize(raw_loop, configuration)
+          @raw_loop = raw_loop
+          @configuration = configuration
+          @success = false
+        end
+
+        def on_ready=(callback)
+          @on_ready = callback
+        end
+
+        def on_finish=(callback)
+          @on_finish = callback
+        end
+
+        def run
+          control_read_in, control_read_out = IO.pipe
+          listen_fd =****@confi*****_socket.fileno
+          heartbeat_fd =****@confi*****_socket.fileno
+          env = {}
+          command_line = [
+            RbConfig.ruby,
+            "-S",
+            "#{$0}-service",
+            "--listen-fd", listen_fd.to_s,
+            "--heartbeat-fd", heartbeat_fd.to_s,
+            "--control-write-fd", control_read_out.fileno.to_s,
+            *@configuration.to_command_line,
+          ]
+          options = {
+            listen_fd => listen_fd,
+            heartbeat_fd => heartbeat_fd,
+            control_read_out => control_read_out,
+          }
+          if @log_output
+            options[:out] = @log_output
+            options[:err] = @log_output
+          end
+          @pid = spawn(env, *command_line, options)
+          control_read_out.close
+          attach_control_read_in(control_read_in)
+        end
+
+        def stop_graceful
+          stop(Signals::GRACEFUL_STOP)
+        end
+
+        def stop_immedieate
+          stop(Signals::IMMEDIATE_STOP)
+        end
+
+        def success?
+          @success
+        end
+
+        private
+        def stop(signal)
+          return if****@pid*****?
+
+          pid = @pid
+          Process.kill(signal, pid)
+          @pid = nil
+          @stop_timer = Coolio::TimerWatcher.new(0.5, true)
+          on_timer = lambda do
+            _, status = Process.waitpid2(pid, Process::WNOHANG)
+            if status
+              @success = status.success?
+              @stop_timer.detach
+            end
+          end
+          @stop_timer.on_timer do
+            on_timer.call
+          end
+          @raw_loop.attach(@stop_timer)
+
+          @control_read_in.close
+        end
+
+        def attach_control_read_in(control_read_in)
+          @control_read_in = Coolio::IO.new(control_read_in)
+          on_read = lambda do |data|
+            # TODO: should buffer data to handle half line received case
+            data.each_line do |line|
+              case line
+              when "ready\n"
+                @on_ready.call if @on_ready
+              end
+            end
+          end
+          @control_read_in.on_read do |data|
+            on_read.call(data)
+          end
+          @raw_loop.attach(@control_read_in)
+        end
+      end
+
       class Service
         class << self
           def run(command_line_arguments)
@@ -283,10 +348,10 @@ module Droonga
         end
 
         def initialize
-          @configuration = Configuration.new
+          @engine_name = nil
           @listen_fd = nil
           @heartbeat_fd = nil
-          @ready_notiofy_fd = nil
+          @contrtol_fd = nil
         end
 
         def run(command_line_arguments)
@@ -305,7 +370,6 @@ module Droonga
         private
         def parse_command_line_arguments!(command_line_arguments)
           parser = OptionParser.new
-          @configuration.add_command_line_options(parser)
           add_internal_options(parser)
           parser.parse!(command_line_arguments)
         end
@@ -313,6 +377,10 @@ module Droonga
         def add_internal_options(parser)
           parser.separator("")
           parser.separator("Internal:")
+          parser.on("--engine-name=NAME",
+                    "Use NAME as the name of the engine") do |name|
+            @engine_name = name
+          end
           parser.on("--listen-fd=FD", Integer,
                     "Use FD as the listen file descriptor") do |fd|
             @listen_fd = fd
@@ -321,9 +389,9 @@ module Droonga
                     "Use FD as the heartbeat file descriptor") do |fd|
             @heartbeat_fd = fd
           end
-          parser.on("--ready-notify-fd=FD", Integer,
-                    "Use FD for notifying the service ready") do |fd|
-            @ready_notify_fd = fd
+          parser.on("--control-write-fd=FD", Integer,
+                    "Use FD to write control messages from the service") do |fd|
+            @control_write_fd = fd
           end
         end
 
@@ -336,25 +404,26 @@ module Droonga
           run_engine
           run_receiver
           setup_signals
-          notify_ready
+          run_control_io
           @loop.run
         end
 
         def shutdown_services
+          shutdown_control_io
           shutdown_receiver
           shutdown_engine
           @loop = nil
         end
 
         def run_engine
-          @engine = Engine.new(@loop, @configuration.engine_name)
+          @engine = Engine.new(@loop, @engine_name)
           @engine.start
         end
 
         def shutdown_engine
           return if****@engin*****?
-          @engine.shutdown
-          @engine = nil
+          @engine, engine = nil, @engine
+          engine.shutdown
         end
 
         def run_receiver
@@ -364,8 +433,22 @@ module Droonga
 
         def shutdown_receiver
           return if****@recei*****?
-          @receiver.shutdown
-          @receiver = nil
+          @receiver, receiver = nil, @receiver
+          receiver.shutdown
+        end
+
+        def run_control_io
+          @control_write = Coolio::IO.new(IO.new(@control_write_fd))
+          @control_write_fd = nil
+          @loop.attach(@control_write)
+
+          @control_write.write("ready\n")
+        end
+
+        def shutdown_control_io
+          return if @control_write.nil?
+          @control_write, control_write = nil, @control_write
+          control_write.close
         end
 
         def create_receiver
@@ -416,26 +499,12 @@ module Droonga
         end
 
         def stop_graceful
-          @loop.stop if @loop
+          @loop.stop
         end
 
         def stop_immediate
-          stop_graceful
           shutdown_services
         end
-
-        def notify_ready
-          return if @ready_notify_fd.nil?
-          ready_notify_io = IO.new(@ready_notify_fd)
-          @ready_notify_fd = nil
-          watcher = Coolio::IOWatcher.new(ready_notify_io, "w")
-          @loop.attach(watcher)
-          watcher.on_writable do
-            ready_notify_io.write("ready\n")
-            ready_notify_io.close
-            detach
-          end
-        end
       end
     end
   end
-------------- next part --------------
HTML����������������������������...
Download 



More information about the Groonga-commit mailing list
Back to archive index