Kouhei Sutou
null+****@clear*****
Tue May 20 19:17:23 JST 2014
Kouhei Sutou 2014-05-20 19:17:23 +0900 (Tue, 20 May 2014) New Revision: cb268cc1ae361f8943e6fc875aee424a51c9c941 https://github.com/droonga/droonga-engine/commit/cb268cc1ae361f8943e6fc875aee424a51c9c941 Message: Use event loop based main loop in supervisor Modified files: lib/droonga/command/droonga_engine.rb Modified: lib/droonga/command/droonga_engine.rb (+166 -97) =================================================================== --- lib/droonga/command/droonga_engine.rb 2014-05-20 16:36:23 +0900 (dac0fc9) +++ lib/droonga/command/droonga_engine.rb 2014-05-20 19:17:23 +0900 (1cb33c8) @@ -32,7 +32,7 @@ module Droonga include ServerEngine::Daemon::Signals end - class Configuration + class SupervisorConfiguration DEFAULT_HOST = Socket.gethostname DEFAULT_PORT = 10031 @@ -65,10 +65,7 @@ module Droonga def to_command_line command_line_options = [ - "--host", @host, - "--port", @port.to_s, - "--tag", @tag, - "--log-level", log_level, + "--engine-name", engine_name, ] command_line_options end @@ -79,6 +76,14 @@ module Droonga add_process_options(parser) end + def listen_socket + @listen_socket ||= TCPServer.new(@host, @port) + end + + def heartbeat_socket + @heartbeat_socket ||= bind_heartbeat_socket + end + private def add_connection_options(parser) parser.separator("") @@ -129,6 +134,12 @@ module Droonga @pid_file = file end end + + def bind_heartbeat_socket + socket = UDPSocket.new(address_family) + socket.bind(@host, @port) + socket + end end class Supervisor @@ -139,19 +150,13 @@ module Droonga end def initialize - @configuration = Configuration.new + @configuration = SupervisorConfiguration.new @log_output = nil end def run(command_line_arguments) parse_command_line_arguments!(command_line_arguments) - @listen_socket = TCPServer.new(@configuration.host, - @configuration.port) - @heartbeat_socket = UDPSocket.new(@configuration.address_family) - @heartbeat_socket.bind(@configuration.host, - @configuration.port) - ENV[Droonga::BASE_DIR_ENV_NAME] ||= Dir.pwd if****@confi*****? @@ -172,80 +177,44 @@ module Droonga parser.parse!(command_line_arguments) end - def run_service(ready_notify_fd=nil) - listen_fd = @listen_socket.fileno - heartbeat_fd = @heartbeat_socket.fileno - env = {} - command_line = [ - RbConfig.ruby, - "-S", - "#{$0}-service", - "--listen-fd", listen_fd.to_s, - "--heartbeat-fd", heartbeat_fd.to_s, - *@configuration.to_command_line - ] - options = { - listen_fd => listen_fd, - heartbeat_fd => heartbeat_fd, - } - if ready_notify_fd - command_line.push("--ready-notify-fd", ready_notify_fd.to_s) - options[ready_notify_fd] = ready_notify_fd - end - if @log_output - options[:out] = @log_output - options[:err] = @log_output - end - spawn(env, *command_line, options) + def run_service(loop) + service_runner = ServiceRunner.new(loop, @configuration) + service_runner.run + service_runner end def run_main_loop - service_pid = nil - running = true + raw_loop = Coolio::Loop.default + service_runner = nil trap(:INT) do - Process.kill(:INT, service_pid) - running = false + service_runner.stop_immedieate + raw_loop.stop end trap(Signals::GRACEFUL_STOP) do - Process.kill(Signals::GRACEFUL_STOP, service_pid) - running = false + service_runner.stop_graceful end trap(Signals::IMMEDIATE_STOP) do - Process.kill(Signals::IMMEDIATE_STOP, service_pid) - running = false + service_runner.stop_immediate + raw_loop.stop end trap(Signals::GRACEFUL_RESTART) do - old_service_pid = service_pid - IO.pipe do |ready_notify_read_io, ready_notify_write_io| - service_pid = run_service(ready_notify_write_io.fileno) - ready_notify_write_io.close - IO.select([ready_notify_read_io]) - Process.kill(Signals::GRACEFUL_STOP, old_service_pid) + old_service_runner = service_runner + service_runner = run_service(raw_loop) + service_runner.on_ready = lambda do + old_service_runner.stop_graceful end end trap(Signals::IMMEDIATE_RESTART) do - old_service_pid = service_pid - service_pid = run_service - Process.kill(Signals::IMMEDIATE_STOP, old_service_pid) - end - - succeeded = true - while running - service_pid ||= run_service - finished_pid, status = Process.waitpid2(service_pid) - service_pid = nil if service_pid == finished_pid - if status.nil? - succeeded = false - break - end - unless status.success? - succeeded = false - break - end + old_service_runner = service_runner + service_runner = run_service(raw_loop) + old_service_runner.stop_immediate end - succeeded + service_runner = run_service(raw_loop) + raw_loop.run + + service_runner.success? end def open_log_file @@ -275,6 +244,102 @@ module Droonga end end + class ServiceRunner + def initialize(raw_loop, configuration) + @raw_loop = raw_loop + @configuration = configuration + @success = false + end + + def on_ready=(callback) + @on_ready = callback + end + + def on_finish=(callback) + @on_finish = callback + end + + def run + control_read_in, control_read_out = IO.pipe + listen_fd =****@confi*****_socket.fileno + heartbeat_fd =****@confi*****_socket.fileno + env = {} + command_line = [ + RbConfig.ruby, + "-S", + "#{$0}-service", + "--listen-fd", listen_fd.to_s, + "--heartbeat-fd", heartbeat_fd.to_s, + "--control-write-fd", control_read_out.fileno.to_s, + *@configuration.to_command_line, + ] + options = { + listen_fd => listen_fd, + heartbeat_fd => heartbeat_fd, + control_read_out => control_read_out, + } + if @log_output + options[:out] = @log_output + options[:err] = @log_output + end + @pid = spawn(env, *command_line, options) + control_read_out.close + attach_control_read_in(control_read_in) + end + + def stop_graceful + stop(Signals::GRACEFUL_STOP) + end + + def stop_immedieate + stop(Signals::IMMEDIATE_STOP) + end + + def success? + @success + end + + private + def stop(signal) + return if****@pid*****? + + pid = @pid + Process.kill(signal, pid) + @pid = nil + @stop_timer = Coolio::TimerWatcher.new(0.5, true) + on_timer = lambda do + _, status = Process.waitpid2(pid, Process::WNOHANG) + if status + @success = status.success? + @stop_timer.detach + end + end + @stop_timer.on_timer do + on_timer.call + end + @raw_loop.attach(@stop_timer) + + @control_read_in.close + end + + def attach_control_read_in(control_read_in) + @control_read_in = Coolio::IO.new(control_read_in) + on_read = lambda do |data| + # TODO: should buffer data to handle half line received case + data.each_line do |line| + case line + when "ready\n" + @on_ready.call if @on_ready + end + end + end + @control_read_in.on_read do |data| + on_read.call(data) + end + @raw_loop.attach(@control_read_in) + end + end + class Service class << self def run(command_line_arguments) @@ -283,10 +348,10 @@ module Droonga end def initialize - @configuration = Configuration.new + @engine_name = nil @listen_fd = nil @heartbeat_fd = nil - @ready_notiofy_fd = nil + @contrtol_fd = nil end def run(command_line_arguments) @@ -305,7 +370,6 @@ module Droonga private def parse_command_line_arguments!(command_line_arguments) parser = OptionParser.new - @configuration.add_command_line_options(parser) add_internal_options(parser) parser.parse!(command_line_arguments) end @@ -313,6 +377,10 @@ module Droonga def add_internal_options(parser) parser.separator("") parser.separator("Internal:") + parser.on("--engine-name=NAME", + "Use NAME as the name of the engine") do |name| + @engine_name = name + end parser.on("--listen-fd=FD", Integer, "Use FD as the listen file descriptor") do |fd| @listen_fd = fd @@ -321,9 +389,9 @@ module Droonga "Use FD as the heartbeat file descriptor") do |fd| @heartbeat_fd = fd end - parser.on("--ready-notify-fd=FD", Integer, - "Use FD for notifying the service ready") do |fd| - @ready_notify_fd = fd + parser.on("--control-write-fd=FD", Integer, + "Use FD to write control messages from the service") do |fd| + @control_write_fd = fd end end @@ -336,25 +404,26 @@ module Droonga run_engine run_receiver setup_signals - notify_ready + run_control_io @loop.run end def shutdown_services + shutdown_control_io shutdown_receiver shutdown_engine @loop = nil end def run_engine - @engine = Engine.new(@loop, @configuration.engine_name) + @engine = Engine.new(@loop, @engine_name) @engine.start end def shutdown_engine return if****@engin*****? - @engine.shutdown - @engine = nil + @engine, engine = nil, @engine + engine.shutdown end def run_receiver @@ -364,8 +433,22 @@ module Droonga def shutdown_receiver return if****@recei*****? - @receiver.shutdown - @receiver = nil + @receiver, receiver = nil, @receiver + receiver.shutdown + end + + def run_control_io + @control_write = Coolio::IO.new(IO.new(@control_write_fd)) + @control_write_fd = nil + @loop.attach(@control_write) + + @control_write.write("ready\n") + end + + def shutdown_control_io + return if @control_write.nil? + @control_write, control_write = nil, @control_write + control_write.close end def create_receiver @@ -416,26 +499,12 @@ module Droonga end def stop_graceful - @loop.stop if @loop + @loop.stop end def stop_immediate - stop_graceful shutdown_services end - - def notify_ready - return if @ready_notify_fd.nil? - ready_notify_io = IO.new(@ready_notify_fd) - @ready_notify_fd = nil - watcher = Coolio::IOWatcher.new(ready_notify_io, "w") - @loop.attach(watcher) - watcher.on_writable do - ready_notify_io.write("ready\n") - ready_notify_io.close - detach - end - end end end end -------------- next part -------------- HTML����������������������������...Download