Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fluent/buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def write_to(io)

def msgpack_each(&block)
open {|io|
u = MessagePack::Unpacker.new(io)
u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
u.each(&block)
rescue EOFError
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/command/cat.rb
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def abort_message(time, record)

when 'msgpack'
begin
u = MessagePack::Unpacker.new($stdin)
u = Fluent::Engine.msgpack_factory.unpacker($stdin)
u.each {|record|
w.write(record)
}
Expand Down
13 changes: 13 additions & 0 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ module Fluent
require 'fluent/root_agent'

class EngineClass
class DummyMessagePackFactory
def packer(io = nil)
MessagePack::Packer.new(io)
end

def unpacker(io = nil)
MessagePack::Unpacker.new(io)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this (and packer) method should be:

def unpacker(*args)
  MessagePack::Unpacker.new(*args)
end

because Unpacker accepts some options such as allow_unknown_ext: true in addition to io.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. I will update the patch. Thanks

end
end

def initialize
@root_agent = nil
@event_router = nil
Expand All @@ -30,13 +40,16 @@ def initialize
@log_event_queue = []

@suppress_config_dump = false

@msgpack_factory = DummyMessagePackFactory.new
end

MATCH_CACHE_SIZE = 1024
LOG_EMIT_INTERVAL = 0.1

attr_reader :root_agent
attr_reader :matches, :sources
attr_reader :msgpack_factory

def init(opts = {})
BasicSocket.do_not_reverse_lookup = true
Expand Down
4 changes: 2 additions & 2 deletions lib/fluent/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def each(&block)
end

def to_msgpack_stream
out = MessagePack::Packer.new # MessagePack::Packer is fastest way to serialize events
out = Fluent::Engine.msgpack_factory.packer
each {|time,record|
out.write([time,record])
}
Expand Down Expand Up @@ -143,7 +143,7 @@ def repeatable?

def each(&block)
# TODO format check
unpacker = MessagePack::Unpacker.new
unpacker = Fluent::Engine.msgpack_factory.unpacker
unpacker.feed_each(@data, &block)
nil
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/buf_memory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def write_to(io)

# optimize
def msgpack_each(&block)
u = MessagePack::Unpacker.new
u = Fluent::Engine.msgpack_factory.unpacker
u.feed_each(@data, &block)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/exec_util.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def call(io)

class MessagePackParser < Parser
def call(io)
@u = MessagePack::Unpacker.new(io)
@u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
@u.each(&@on_message)
rescue EOFError
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ def on_read(data)
else
m = method(:on_read_msgpack)
@serializer = :to_msgpack.to_proc
@u = MessagePack::Unpacker.new
@u = Fluent::Engine.msgpack_factory.unpacker
end

(class << self; self; end).module_eval do
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def on_request(path_info, params)

def parse_params_default(params)
record = if msgpack = params['msgpack']
MessagePack.unpack(msgpack)
Engine.msgpack_factory.unpacker.feed(msgpack).read
elsif js = params['json']
JSON.parse(js)
else
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def on_read(data)
@y.on_parse_complete = @on_message
else
m = method(:on_read_msgpack)
@u = MessagePack::Unpacker.new
@u = Fluent::Engine.msgpack_factory.unpacker
end

(class << self; self; end).module_eval do
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/out_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def write(chunk)
chain = NullOutputChain.instance
chunk.open {|io|
# TODO use MessagePackIoEventStream
u = MessagePack::Unpacker.new(io)
u = Fluent::Engine.msgpack_factory.unpacker(io)
begin
u.each {|(tag,entries)|
es = MultiEventStream.new
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def input_forward_main(ipr, pid)
end

def read_event_stream(r, &block)
u = MessagePack::Unpacker.new(r)
u = Fluent::Engine.msgpack_factory.unpacker(r)
begin
#buf = ''
#map = {}
Expand Down
14 changes: 7 additions & 7 deletions test/plugin/test_in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_message

d.run do
d.expected_emits.each {|tag,time,record|
send_data [tag, time, record].to_msgpack
send_data Fluent::Engine.msgpack_factory.packer.write([tag, time, record]).to_s
}
end
end
Expand All @@ -77,7 +77,7 @@ def test_forward
d.expected_emits.each {|tag,time,record|
entries << [time, record]
}
send_data ["tag1", entries].to_msgpack
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
end
end

Expand All @@ -92,9 +92,9 @@ def test_packed_forward
d.run do
entries = ''
d.expected_emits.each {|tag,time,record|
[time, record].to_msgpack(entries)
Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush
}
send_data ["tag1", entries].to_msgpack
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
end
end

Expand Down Expand Up @@ -128,7 +128,7 @@ def test_send_large_chunk_warning
assert chunk.size < (32 * 1024 * 1024)

d.run do
MessagePack::Unpacker.new.feed_each(chunk) do |obj|
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
end
end
Expand Down Expand Up @@ -157,7 +157,7 @@ def test_send_large_chunk_only_warning
chunk = [ "test.tag", (0...16).map{|i| [time + i, {"data" => str}] } ].to_msgpack

d.run do
MessagePack::Unpacker.new.feed_each(chunk) do |obj|
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
end
end
Expand All @@ -184,7 +184,7 @@ def test_send_large_chunk_limit

# d.run => send_data
d.run do
MessagePack::Unpacker.new.feed_each(chunk) do |obj|
Fluent::Engine.msgpack_factory.unpacker.feed_each(chunk) do |obj|
d.instance.send(:on_message, obj, chunk.size, "host: 127.0.0.1, addr: 127.0.0.1, port: 0000")
end
end
Expand Down
10 changes: 5 additions & 5 deletions test/plugin/test_in_stream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def test_time

d.run do
d.expected_emits.each {|tag,time,record|
send_data [tag, 0, record].to_msgpack
send_data Fluent::Engine.msgpack_factory.packer.write([tag, 0, record]).to_s
}
end
end
Expand All @@ -32,7 +32,7 @@ def test_message

d.run do
d.expected_emits.each {|tag,time,record|
send_data [tag, time, record].to_msgpack
send_data Fluent::Engine.msgpack_factory.packer.write([tag, time, record]).to_s
}
end
end
Expand All @@ -50,7 +50,7 @@ def test_forward
d.expected_emits.each {|tag,time,record|
entries << [time, record]
}
send_data ["tag1", entries].to_msgpack
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
end
end

Expand All @@ -65,9 +65,9 @@ def test_packed_forward
d.run do
entries = ''
d.expected_emits.each {|tag,time,record|
[time, record].to_msgpack(entries)
Fluent::Engine.msgpack_factory.packer(entries).write([time, record]).flush
}
send_data ["tag1", entries].to_msgpack
send_data Fluent::Engine.msgpack_factory.packer.write(["tag1", entries]).to_s
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/plugin/test_out_copy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def test_msgpack_es_emit_bug

es = if defined?(MessagePack::Packer)
time = Time.parse("2013-05-26 06:37:22 UTC").to_i
packer = MessagePack::Packer.new
packer = Fluent::Engine.msgpack_factory.packer
packer.pack([time, {"a" => 1}])
packer.pack([time, {"a" => 2}])
Fluent::MessagePackEventStream.new(packer.to_s)
Expand Down