YUKI Hiroshi
null+****@clear*****
Thu Dec 18 15:51:46 JST 2014
YUKI Hiroshi 2014-12-18 15:51:46 +0900 (Thu, 18 Dec 2014) New Revision: 110f14cb65d16f76e4140e9453d146bbf732998b https://github.com/droonga/droonga-engine/commit/110f14cb65d16f76e4140e9453d146bbf732998b Merged 69350ee: Merge branch 'master' into buffered-forward Message: Detect delivering routes based on nodes' role Modified files: bin/droonga-engine-join lib/droonga/command/remote.rb lib/droonga/dispatcher.rb lib/droonga/engine_state.rb lib/droonga/live_nodes_list.rb lib/droonga/node_status.rb lib/droonga/plugins/system.rb lib/droonga/serf.rb test/unit/plugins/system/test_status.rb Modified: bin/droonga-engine-join (+5 -4) =================================================================== --- bin/droonga-engine-join 2014-12-16 19:27:30 +0900 (4f3fbb0) +++ bin/droonga-engine-join 2014-12-18 15:51:46 +0900 (52b3d27) @@ -28,6 +28,7 @@ require "droonga/catalog_loader" require "droonga/safe_file_writer" require "droonga/data_absorber" require "droonga/serf" +require "droonga/node_status" class JoinCommand def run @@ -152,11 +153,11 @@ class JoinCommand if absorber.source_node_suspendable? run_remote_command(source_node, "change_role", "node" => source_node, - "role" => "source") + "role" => Droonga::NodeStatus::Role::ABSORB_SOURCE) end run_remote_command(joining_node, "change_role", "node" => joining_node, - "role" => "destination") + "role" => Droonga::NodeStatus::Role::ABSORB_DESTINATION) @node_role_changed = true end @@ -164,11 +165,11 @@ class JoinCommand if absorber.source_node_suspendable? run_remote_command(source_node, "change_role", "node" => source_node, - "role" => "") + "role" => nil) end run_remote_command(joining_node, "change_role", "node" => joining_node, - "role" => "") + "role" => nil) @node_role_changed = false end Modified: lib/droonga/command/remote.rb (+1 -2) =================================================================== --- lib/droonga/command/remote.rb 2014-12-16 19:27:30 +0900 (b079ca3) +++ lib/droonga/command/remote.rb 2014-12-18 15:51:46 +0900 (5726cdc) @@ -90,8 +90,7 @@ module Droonga class ChangeRole < Base def process - status = NodeStatus.new - status.set(:role, @params["role"]) + @serf.role = @params["role"] end end Modified: lib/droonga/dispatcher.rb (+3 -3) =================================================================== --- lib/droonga/dispatcher.rb 2014-12-16 19:27:30 +0900 (7008e71) +++ lib/droonga/dispatcher.rb 2014-12-18 15:51:46 +0900 (9990e80) @@ -195,7 +195,7 @@ module Droonga if write_step?(step) target_nodes = @engine_state.writable_nodes else - target_nodes = @engine_state.readable_nodes + target_nodes = @engine_state.responsive_nodes end routes = dataset.compute_routes(step, target_nodes) step["routes"] = routes @@ -350,8 +350,8 @@ module Droonga (step["outputs"] || []).each do |output| descendants[output] = [] @descendants[output].each do |index| - active_routes = @engine_state.remove_inactive_routes(step["routes"]) - @steps[index]["n_of_expects"] += active_routes.size + responsive_routes = @engine_state.select_responsive_routes(step["routes"]) + @steps[index]["n_of_expects"] += responsive_routes.size descendants[output].concat(@steps[index]["routes"]) end end Modified: lib/droonga/engine_state.rb (+18 -13) =================================================================== --- lib/droonga/engine_state.rb 2014-12-16 19:27:30 +0900 (9d9c38e) +++ lib/droonga/engine_state.rb 2014-12-18 15:51:46 +0900 (e3d2583) @@ -116,8 +116,20 @@ module Droonga end end - def readable_nodes - all_nodes - unreadable_nodes + def service_provider_nodes + if @live_nodes_list + @live_nodes_list.service_provider_nodes + else + all_nodes + end + end + + def responsive_service_provider_nodes + (all_nodes & service_provider_nodes) - dead_nodes + end + + def responsive_nodes + responsive_service_provider_nodes end def writable_nodes @@ -133,9 +145,10 @@ module Droonga @live_nodes_list end - def remove_inactive_routes(routes) - routes.reject do |route| - unreadable_nodes.include?(farm_path(route)) + def select_responsive_routes(routes) + selected_nodes = responsive_service_provider_nodes + routes.select do |route| + selected_nodes.include?(farm_path(route)) end end @@ -144,14 +157,6 @@ module Droonga end private - def unreadable_nodes - if @live_nodes_list - @live_nodes_list.unreadable_nodes - else - [] - end - end - def log_tag "engine_state" end Modified: lib/droonga/live_nodes_list.rb (+22 -7) =================================================================== --- lib/droonga/live_nodes_list.rb 2014-12-16 19:27:30 +0900 (2cd1fdf) +++ lib/droonga/live_nodes_list.rb 2014-12-18 15:51:46 +0900 (f1d3da7) @@ -13,6 +13,8 @@ # License along with this library; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +require "droonga/node_status" + module Droonga class LiveNodesList def initialize(nodes) @@ -27,19 +29,24 @@ module Droonga @dead_nodes ||= collect_dead_nodes end - def suspended_nodes - @suspended_nodes ||= collect_suspended_nodes + def absorb_source_nodes + @absorb_source_nodes ||= collect_absorb_source_nodes + end + + def absorb_destination_nodes + @absorb_destination_nodes ||= collect_destination_source_nodes end - def unreadable_nodes - @unreadable_nodes ||= dead_nodes + suspended_nodes + def service_provider_nodes + @service_provider_nodes ||= all_nodes - absorb_source_nodes - absorb_destination_nodes end def ==(nodes_list) nodes_list.is_a?(self.class) and nodes_list.all_nodes == all_nodes and nodes_list.dead_nodes == dead_nodes and - nodes_list.suspended_nodes == suspended_nodes + nodes_list.absorb_source_nodes == absorb_source_nodes and + nodes_list.absorb_destination_nodes == absorb_destination_nodes end private @@ -53,15 +60,23 @@ module Droonga nodes.sort end - def collect_suspended_nodes + def collect_nodes_by_role(role) nodes = [] @nodes.each do |name, state| if not state["foreign"] and - state["tags"]["suspended"] == "true" + state["tags"]["role"] == role nodes << name end end nodes.sort end + + def collect_absorb_source_nodes + collect_nodes_by_role(NodeStatus::Role::ABSORB_SOURCE) + end + + def collect_absorb_destination_nodes + collect_nodes_by_role(NodeStatus::Role::ABSORB_DESTINATION) + end end end Modified: lib/droonga/node_status.rb (+6 -0) =================================================================== --- lib/droonga/node_status.rb 2014-12-16 19:27:30 +0900 (ff5a04f) +++ lib/droonga/node_status.rb 2014-12-18 15:51:46 +0900 (9ed1a5b) @@ -19,6 +19,12 @@ require "droonga/safe_file_writer" module Droonga class NodeStatus + module Role + SERVICE_PROVIDER = "service-provider" + ABSORB_SOURCE = "absorb-source" + ABSORB_DESTINATION = "absorb-destination" + end + def initialize reload end Modified: lib/droonga/plugins/system.rb (+2 -2) =================================================================== --- lib/droonga/plugins/system.rb 2014-12-16 19:27:30 +0900 (8d2af51) +++ lib/droonga/plugins/system.rb 2014-12-18 15:51:46 +0900 (af1ee22) @@ -26,11 +26,11 @@ module Droonga def handle(message) engine_state =****@messe*****_state - readable_nodes = engine_state.readable_nodes + active_nodes = engine_state.responsive_service_provider_nodes dead_nodes = engine_state.dead_nodes nodes = {} engine_state.all_nodes.collect do |identifier| - if readable_nodes.include?(identifier) + if active_nodes.include?(identifier) status = "active" elsif dead_nodes.include?(identifier) status = "dead" Modified: lib/droonga/serf.rb (+13 -26) =================================================================== --- lib/droonga/serf.rb 2014-12-16 19:27:30 +0900 (6220c38) +++ lib/droonga/serf.rb 2014-12-18 15:51:46 +0900 (f1dca7f) @@ -28,17 +28,8 @@ require "droonga/line_buffer" module Droonga class Serf - ROLE = { - :default => { - :port => 7946, - }, - :source => { - :port => 7947, - }, - :destination => { - :port => 7948, - }, - } + # the port must be different from droonga-http-server's agent! + AGENT_PORT = 7946 class << self def path @@ -158,11 +149,17 @@ module Droonga set_tag("cluster_id", cluster_id) end - def suspended=(suspended) - if suspended - set_tag("suspended", "true") + def role + node_status.get(:role) || NodeStatus::Role::SERVICE_PROVIDER + end + + def role=(new_role) + if new_role + set_tag("role", new_role) + node_status.set(:role, new_role) else - delete_tag("suspended") + delete_tag("role") + node_status.delete(:role) end end @@ -240,18 +237,8 @@ module Droonga @node_status ||= NodeStatus.new end - def role - if node_status.have?(:role) - role = node_status.get(:role).to_sym - if self.class::ROLE.key?(role) - return role - end - end - :default - end - def port - self.class::ROLE[role][:port] + AGENT_PORT end def detect_other_hosts Modified: test/unit/plugins/system/test_status.rb (+1 -1) =================================================================== --- test/unit/plugins/system/test_status.rb 2014-12-16 19:27:30 +0900 (9a4d7a4) +++ test/unit/plugins/system/test_status.rb 2014-12-18 15:51:46 +0900 (a4bd1ea) @@ -54,7 +54,7 @@ class SystemStatusHandlerTest < Test::Unit::TestCase ] end - def readable_nodes + def responsive_service_provider_nodes [ "127.0.0.1:10031/droonga", ] -------------- next part -------------- HTML����������������������������... Download