Kouhei Sutou
null+****@clear*****
Mon Nov 25 13:00:07 JST 2013
Kouhei Sutou 2013-11-25 13:00:07 +0900 (Mon, 25 Nov 2013) New Revision: 744ff9b3d9fd39027db8317fdfeb795ee3528487 https://github.com/droonga/fluent-plugin-droonga/commit/744ff9b3d9fd39027db8317fdfeb795ee3528487 Message: Move fluentd related code to DroongaOutput fluentd plugin Modified files: lib/droonga/engine.rb lib/fluent/plugin/out_droonga.rb Modified: lib/droonga/engine.rb (+2 -26) =================================================================== --- lib/droonga/engine.rb 2013-11-25 11:16:06 +0900 (1433c50) +++ lib/droonga/engine.rb 2013-11-25 13:00:07 +0900 (bdd41ff) @@ -35,32 +35,8 @@ module Droonga $log.trace("engine: shutdown: done") end - def emit(tag, time, record) - $log.trace("[#{Process.pid}] tag: <#{tag}> caller: <#{caller.first}>") - @dispatcher.handle_envelope(parse_record(tag, record)) - end - - private - def parse_record(tag, record) - prefix, type, *arguments = tag.split(/\./) - if type.nil? || type.empty? || type == 'message' - envelope = record - else - envelope = { - "type" => type, - "arguments" => arguments, - "body" => record - } - end - envelope["via"] ||= [] - reply_to = envelope["replyTo"] - if reply_to.is_a? String - envelope["replyTo"] = { - "type" => envelope["type"] + ".result", - "to" => reply_to - } - end - envelope + def process(envelope) + @dispatcher.handle_envelope(envelope) end end end Modified: lib/fluent/plugin/out_droonga.rb (+29 -1) =================================================================== --- lib/fluent/plugin/out_droonga.rb 2013-11-25 11:16:06 +0900 (82854a2) +++ lib/fluent/plugin/out_droonga.rb 2013-11-25 13:00:07 +0900 (84b74cc) @@ -38,9 +38,37 @@ module Fluent def emit(tag, es, chain) es.each do |time, record| - @engine.emit(tag, time, record) + process_event(tag, record) end chain.next end + + private + def process_event(tag, record) + $log.trace("out_droonga: tag: <#{tag}> caller: <#{caller.first}>") + @engine.process(parse_record(tag, record)) + end + + def parse_record(tag, record) + prefix, type, *arguments = tag.split(/\./) + if type.nil? || type.empty? || type == 'message' + envelope = record + else + envelope = { + "type" => type, + "arguments" => arguments, + "body" => record + } + end + envelope["via"] ||= [] + reply_to = envelope["replyTo"] + if reply_to.is_a? String + envelope["replyTo"] = { + "type" => envelope["type"] + ".result", + "to" => reply_to + } + end + envelope + end end end -------------- next part -------------- HTML����������������������������... Download