Kouhei Sutou
null+****@clear*****
Wed Dec 4 12:50:21 JST 2013
Kouhei Sutou 2013-12-04 12:50:21 +0900 (Wed, 04 Dec 2013) New Revision: 846b203038a6a83653b8aee1b003ad294d03a498 https://github.com/droonga/droonga-client-ruby/commit/846b203038a6a83653b8aee1b003ad294d03a498 Message: Support multiple responses But it is disabled for now. Because drntest doesn't expects multiple responses yet. Modified files: lib/droonga/client/connection/droonga_protocol.rb Modified: lib/droonga/client/connection/droonga_protocol.rb (+40 -17) =================================================================== --- lib/droonga/client/connection/droonga_protocol.rb 2013-12-04 11:58:40 +0900 (3c9cc79) +++ lib/droonga/client/connection/droonga_protocol.rb 2013-12-04 12:50:21 +0900 (40345f1) @@ -25,15 +25,17 @@ module Droonga class DroongaProtocol def initialize(options={}) default_options = { - :tag => "droonga", - :host => "127.0.0.1", - :port => 24224, - :timeout => 5 + :tag => "droonga", + :host => "127.0.0.1", + :port => 24224, + :connect_timeout => 1, + :read_timeout => 0.1, } options = default_options.merge(options) @logger = Fluent::Logger::FluentLogger.new(options.delete(:tag), options) - @timeout = options[:timeout] + @connect_timeout = options[:connect_timeout] + @read_timeout = options[:read_timeout] end def search(body) @@ -66,7 +68,8 @@ module Droonga envelope = envelope.dup envelope["replyTo"] = "#{receiver.host}:#{receiver.port}/droonga" @logger.post("message", envelope) - receiver.receive(:timeout => @timeout) + receiver.receive(:connect_timeout => @connect_timeout, + :read_timeout => @read_timeout) ensure receiver.close end @@ -85,8 +88,10 @@ module Droonga class Receiver def initialize(options={}) default_options = { - :host => "0.0.0.0", - :port => 0, + :host => "0.0.0.0", + :port => 0, + :connect_timeout => 1, + :read_timeout => 0.5, } options = default_options.merge(options) @socket = TCPServer.new(options[:host], options[:port]) @@ -104,19 +109,37 @@ module Droonga @socket.addr[1] end + BUFFER_SIZE = 8192 def receive(options={}) - if IO.select([@socket], nil, nil, options[:timeout]) + responses = [] + select(@socket, options[:connect_timeout]) do client =****@socke***** - response = nil - unpacker = MessagePack::Unpacker.new(client) - unpacker.each do |object| - response = object - break + unpacker = MessagePack::Unpacker.new + select(client, options[:read_timeout]) do + data = client.read_nonblock(BUFFER_SIZE) + unpacker.feed_each(data) do |object| + responses << object + end end client.close - response - else - nil + end + # TODO: ENABLE ME + # if responses.size >= 2 + # responses + # else + responses.first + # end + end + + private + def select(input, timeout) + loop do + start = Time.now + readables, = IO.select([input], nil, nil, timeout) + timeout -= (Time.now - start) + timeout = 0 if timeout < 0 + break if readables.nil? + yield(timeout) end end end -------------- next part -------------- HTML����������������������������...Download