1414# limitations under the License.
1515#
1616
17+ require 'monitor'
18+
1719module Fluent
1820 module Plugin
1921 class BufferError < StandardError ; end
20- class BufferChunkLimitError < BufferError ; end
21- class BufferQueueLimitError < BufferError ; end
22+ class BufferOverflowError < BufferError ; end
23+ class BufferChunkOverflowError < BufferError ; end # A record size is larger than chunk size limit
2224
2325 # Buffer is to define an interface for all buffer plugins.
24- # Use BasicBuffer as a superclass for 3rd party buffer plugins.
2526
26- DEFAULT_CHUNK_SIZE = 8 * 1024 * 1024 # 8MB for memory
27- DEFAULT_QUEUE_LENGTH = 256 # (8MB * 256 ==) 2GB for memory
27+ DEFAULT_CHUNK_BYTES_LIMIT = 8 * 1024 * 1024 # 8MB for memory
28+ DEFAULT_QUEUE_LENGTH_LIMIT = 256 # (8MB * 256 ==) 2GB for memory
29+
30+ MINIMUM_APPEND_ATTEMPT_SIZE = 10
2831
2932 # Buffers are built on 2 element:
3033 # * stage: Array of chunks under writing, specified by metadata
3134 # * queue: FIFO list of chunks, which are already fulfilled, and to be flushed
3235 # Queue of a Buffer instance is shared by variations of metadata
3336 class Buffer
3437 include Configurable
38+ include MonitorMixin
3539
3640 config_section :buffer , param_name : :buffer_config , required : false , multi : false do
37- config_param :chunk_size , :size , default : DEFAULT_CHUNK_SIZE
38- config_param :total_size , :size , default : DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH
41+ config_param :chunk_bytes_limit , :size , default : DEFAULT_CHUNK_BYTES_LIMIT
42+ config_param :total_bytes_limit , :size , default : DEFAULT_CHUNK_BYTES_LIMIT * DEFAULT_QUEUE_LENGTH_LIMIT
3943
4044 config_param :flush_interval , :time , default : nil
4145
4246 # If user specify this value and (chunk_size * queue_length) is smaller than total_size,
4347 # then total_size is automatically configured to that value
44- config_param :queue_length , :integer , default : nil
48+ config_param :queue_length_limit , :integer , default : nil
4549
4650 # optional new limitations
47- config_param :chunk_records , :integer , default : nil
51+ config_param :chunk_records_limit , :integer , default : nil
4852
4953 # TODO: pipeline mode? to flush ASAP after emit
5054 end
5155
56+ Metadata = Struct . new ( :timekey , :tag , :variables )
57+
5258 def initialize ( logger )
5359 super ( )
5460 @log = logger
5561
56- @chunk_size = nil
62+ @chunk_size_limit = nil
5763 @chunk_records = nil
5864
59- @total_size = nil
65+ @total_size_limit = nil
6066 @queue_length = nil
6167
6268 @flush_interval = nil
@@ -66,34 +72,92 @@ def configure(conf)
6672 super
6773
6874 if @buffer_config
69- @chunk_size = @buffer_config . chunk_size
70- @chunk_records = @buffer_config . chunk_records
71- @total_size = @buffer_config . total_size
72- @queue_length = @buffer_config . queue_length
73- if @queue_length && @total_size > @chunk_size * @queue_length
74- @total_size = @chunk_size * @queue_length
75+ @chunk_bytes_limit = @buffer_config . chunk_bytes_limit
76+ @total_bytes_limit = @buffer_config . total_bytes_limit
77+
78+ @chunk_records_limit = @buffer_config . chunk_records_limit
79+
80+ @queue_length_limit = @buffer_config . queue_length_limit
81+ if @queue_length_limit && @total_bytes_limit > @chunk_bytes_limit * @queue_length_limit
82+ @total_bytes_limit = @chunk_bytes_limit * @queue_length_limit
7583 end
7684 @flush_interval = @buffer_config . flush_interval
7785 else
78- @chunk_size = DEFAULT_CHUNK_SIZE
79- @total_size = DEFAULT_CHUNK_SIZE * DEFAULT_QUEUE_LENGTH
80- @queue_length = DEFAULT_QUEUE_LENGTH
86+ @chunk_bytes_limit = DEFAULT_CHUNK_BYTES_LIMIT
87+ @total_bytes_limit = DEFAULT_CHUNK_BYTES_LIMIT * DEFAULT_QUEUE_LENGTH_LIMIT
88+ @queue_length_limit = DEFAULT_QUEUE_LENGTH_LIMIT
8189 end
8290 end
8391
84- def allow_concurrent_pop?
92+ def start
93+ super
94+ @stage , @queue = resume
95+ @queue . extend ( MonitorMixin )
96+
97+ @stage_size = @queue_size = 0
98+ @metadata_list = [ ] # keys of @stage
99+ end
100+
101+ def storable?
102+ @total_size_limit > @stage_size + @queue_size
103+ end
104+
105+ def used? ( ratio )
106+ @total_size_limit * ratio > @stage_size + @queue_size
107+ end
108+
109+ def resume
85110 raise NotImplementedError , "Implement this method in child class"
86111 end
87112
88- def start
89- super
113+ def metadata ( key_value_pairs = { } )
114+ timekey = key_value_pairs . delete ( :timekey )
115+ tag = key_value_pairs . delete ( :tag )
116+ variables = key_value_pairs . keys . sort . map { |k | key_value_pairs [ k ] }
117+
118+ meta = Metadata . new ( timekey , tag , variables )
119+ synchronize do
120+ if i = @metadata_list . index ( meta )
121+ @metadata_list [ i ]
122+ else
123+ @metadata_list << meta
124+ meta
125+ end
126+ end
90127 end
91128
92- def emit ( data , metadata )
129+ # metadata MUST have consistent object_id for each variation
130+ # data MUST be Array of serialized events
131+ def emit ( metadata , data )
132+ return if data . size < 1
133+ raise BufferOverflowError unless storable?
134+
135+ stored = false
136+ data_size = data . size
137+
138+ # the case whole data can be stored in staged chunk: almost all emits will success
139+ chunk = synchronize { @stage [ metadata ] ||= generate_chunk ( metadata ) }
140+ chunk . synchronize do
141+ begin
142+ chunk . append ( data )
143+ unless size_over? ( chunk )
144+ chunk . commit
145+ stored = true
146+ end
147+ ensure
148+ chunk . rollback
149+ end
150+ end
151+ return if stored
152+
153+ emit_step_by_step ( metadata , data )
154+ end
155+
156+ def generate_chunk ( metadata )
93157 raise NotImplementedError , "Implement this method in child class"
94158 end
95159
96- def enqueue_chunk ( key )
160+ def enqueue_chunk ( metadata )
97161 raise NotImplementedError , "Implement this method in child class"
98162 end
99163
@@ -113,15 +177,84 @@ def stop
113177 end
114178
115179 def before_shutdown ( out )
180+ # at here, buffer may be flushed w/ flush_at_shutdown
116181 end
117182
118183 def shutdown
119184 end
120185
121186 def close
187+ synchronize do
188+ @queue . synchronize do
189+ until @queue . empty?
190+ @queue . shift . close
191+ end
192+ end
193+ @stage . each_pair do |key , chunk |
194+ chunk . close
195+ end
196+ end
122197 end
123198
124199 def terminate
200+ @stage = @queue = nil
201+ end
202+
203+ def size_over? ( chunk )
204+ chunk . size > @chunk_bytes_limit || ( @chunk_records_limit && chunk . records > @chunk_records_limit )
205+ end
206+
207+ def emit_step_by_step ( metadata , data )
208+ attempt_size = data . size / 3
209+
210+ synchronize do # critical section for buffer (stage/queue)
211+ while data . size > 0
212+ if attempt_size < MINIMUM_APPEND_ATTEMPT_SIZE
213+ attempt_size = MINIMUM_APPEND_ATTEMPT_SIZE
214+ end
215+
216+ chunk = @stage [ metadata ]
217+ unless chunk
218+ chunk = @stage [ metadata ] = generate_chunk ( metadata )
219+ end
220+
221+ chunk . synchronize do # critical section for chunk (chunk append/commit/rollback)
222+ begin
223+ empty_chunk = chunk . empty?
224+
225+ attempt = data . slice ( 0 , attempt_size )
226+ chunk . append ( attempt )
227+
228+ if size_over? ( chunk )
229+ chunk . rollback
230+
231+ if attempt_size <= MINIMUM_APPEND_ATTEMPT_SIZE
232+ if empty_chunk # record is too large even for empty chunk
233+ raise BufferChunkOverflowError , "minimum append butch exceeds chunk bytes limit"
234+ end
235+ # no more records for this chunk -> enqueue -> to be flushed
236+ enqueue_chunk ( metadata ) # `chunk` will be removed from stage
237+ attempt_size = data . size # fresh chunk may have enough space
238+ else
239+ # whole data can be processed by twice operation
240+ # ( by using apttempt /= 2, 3 operations required for odd numbers of data)
241+ attempt_size = ( attempt_size / 2 ) + 1
242+ end
243+
244+ next
245+ end
246+
247+ chunk . commit
248+ data . slice! ( 0 , attempt_size )
249+ # same attempt size
250+ nil # discard return value of data.slice!() immediately
251+ ensure
252+ chunk . rollback
253+ end
254+ end
255+ end
256+ end
257+ nil
125258 end
126259 end
127260 end
0 commit comments