@@ -20,6 +20,7 @@ import (
2020 "encoding/binary"
2121 "errors"
2222 "fmt"
23+ "io"
2324 "os"
2425 "path/filepath"
2526 "sync"
@@ -106,6 +107,44 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr
106107 return newCustomTable (path , name , readMeter , writeMeter , 2 * 1000 * 1000 * 1000 , disableSnappy )
107108}
108109
110+ // openFreezerFileForAppend opens a freezer table file and seeks to the end
111+ func openFreezerFileForAppend (filename string ) (* os.File , error ) {
112+ // Open the file without the O_APPEND flag
113+ // because it has differing behaviour during Truncate operations
114+ // on different OS's
115+ file , err := os .OpenFile (filename , os .O_RDWR | os .O_CREATE , 0644 )
116+ if err != nil {
117+ return nil , err
118+ }
119+ // Seek to end for append
120+ if _ , err = file .Seek (0 , io .SeekEnd ); err != nil {
121+ return nil , err
122+ }
123+ return file , nil
124+ }
125+
126+ // openFreezerFileForReadOnly opens a freezer table file for read only access
127+ func openFreezerFileForReadOnly (filename string ) (* os.File , error ) {
128+ return os .OpenFile (filename , os .O_RDONLY , 0644 )
129+ }
130+
131+ // openFreezerFileTruncated opens a freezer table making sure it is truncated
132+ func openFreezerFileTruncated (filename string ) (* os.File , error ) {
133+ return os .OpenFile (filename , os .O_RDWR | os .O_CREATE | os .O_TRUNC , 0644 )
134+ }
135+
136+ // truncateFreezerFile resizes a freezer table file and seeks to the end
137+ func truncateFreezerFile (file * os.File , size int64 ) error {
138+ if err := file .Truncate (size ); err != nil {
139+ return err
140+ }
141+ // Seek to end for append
142+ if _ , err := file .Seek (0 , io .SeekEnd ); err != nil {
143+ return err
144+ }
145+ return nil
146+ }
147+
109148// newCustomTable opens a freezer table, creating the data and index files if they are
110149// non existent. Both files are truncated to the shortest common length to ensure
111150// they don't go out of sync.
@@ -116,13 +155,13 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
116155 }
117156 var idxName string
118157 if noCompression {
119- // raw idx
158+ // Raw idx
120159 idxName = fmt .Sprintf ("%s.ridx" , name )
121160 } else {
122- // compressed idx
161+ // Compressed idx
123162 idxName = fmt .Sprintf ("%s.cidx" , name )
124163 }
125- offsets , err := os . OpenFile (filepath .Join (path , idxName ), os . O_RDWR | os . O_CREATE | os . O_APPEND , 0644 )
164+ offsets , err := openFreezerFileForAppend (filepath .Join (path , idxName ))
126165 if err != nil {
127166 return nil , err
128167 }
@@ -163,7 +202,7 @@ func (t *freezerTable) repair() error {
163202 }
164203 // Ensure the index is a multiple of indexEntrySize bytes
165204 if overflow := stat .Size () % indexEntrySize ; overflow != 0 {
166- t .index . Truncate ( stat .Size () - overflow ) // New file can't trigger this path
205+ truncateFreezerFile ( t .index , stat .Size ()- overflow ) // New file can't trigger this path
167206 }
168207 // Retrieve the file sizes and prepare for truncation
169208 if stat , err = t .index .Stat (); err != nil {
@@ -188,7 +227,7 @@ func (t *freezerTable) repair() error {
188227
189228 t .index .ReadAt (buffer , offsetsSize - indexEntrySize )
190229 lastIndex .unmarshalBinary (buffer )
191- t .head , err = t .openFile (lastIndex .filenum , os . O_RDWR | os . O_CREATE | os . O_APPEND )
230+ t .head , err = t .openFile (lastIndex .filenum , openFreezerFileForAppend )
192231 if err != nil {
193232 return err
194233 }
@@ -204,15 +243,15 @@ func (t *freezerTable) repair() error {
204243 // Truncate the head file to the last offset pointer
205244 if contentExp < contentSize {
206245 t .logger .Warn ("Truncating dangling head" , "indexed" , common .StorageSize (contentExp ), "stored" , common .StorageSize (contentSize ))
207- if err := t .head . Truncate ( contentExp ); err != nil {
246+ if err := truncateFreezerFile ( t .head , contentExp ); err != nil {
208247 return err
209248 }
210249 contentSize = contentExp
211250 }
212251 // Truncate the index to point within the head file
213252 if contentExp > contentSize {
214253 t .logger .Warn ("Truncating dangling indexes" , "indexed" , common .StorageSize (contentExp ), "stored" , common .StorageSize (contentSize ))
215- if err := t .index . Truncate ( offsetsSize - indexEntrySize ); err != nil {
254+ if err := truncateFreezerFile ( t .index , offsetsSize - indexEntrySize ); err != nil {
216255 return err
217256 }
218257 offsetsSize -= indexEntrySize
@@ -221,9 +260,9 @@ func (t *freezerTable) repair() error {
221260 newLastIndex .unmarshalBinary (buffer )
222261 // We might have slipped back into an earlier head-file here
223262 if newLastIndex .filenum != lastIndex .filenum {
224- // release earlier opened file
263+ // Release earlier opened file
225264 t .releaseFile (lastIndex .filenum )
226- t .head , err = t .openFile (newLastIndex .filenum , os . O_RDWR | os . O_CREATE | os . O_APPEND )
265+ t .head , err = t .openFile (newLastIndex .filenum , openFreezerFileForAppend )
227266 if stat , err = t .head .Stat (); err != nil {
228267 // TODO, anything more we can do here?
229268 // A data file has gone missing...
@@ -264,16 +303,16 @@ func (t *freezerTable) preopen() (err error) {
264303 t .releaseFilesAfter (0 , false )
265304 // Open all except head in RDONLY
266305 for i := t .tailId ; i < t .headId ; i ++ {
267- if _ , err = t .openFile (i , os . O_RDONLY ); err != nil {
306+ if _ , err = t .openFile (i , openFreezerFileForReadOnly ); err != nil {
268307 return err
269308 }
270309 }
271310 // Open head in read/write
272- t .head , err = t .openFile (t .headId , os . O_RDWR | os . O_CREATE | os . O_APPEND )
311+ t .head , err = t .openFile (t .headId , openFreezerFileForAppend )
273312 return err
274313}
275314
276- // truncate discards any recent data above the provided threashold number.
315+ // truncate discards any recent data above the provided threshold number.
277316func (t * freezerTable ) truncate (items uint64 ) error {
278317 t .lock .Lock ()
279318 defer t .lock .Unlock ()
@@ -284,7 +323,7 @@ func (t *freezerTable) truncate(items uint64) error {
284323 }
285324 // Something's out of sync, truncate the table's offset index
286325 t .logger .Warn ("Truncating freezer table" , "items" , t .items , "limit" , items )
287- if err := t .index . Truncate ( int64 (items + 1 ) * indexEntrySize ); err != nil {
326+ if err := truncateFreezerFile ( t .index , int64 (items + 1 )* indexEntrySize ); err != nil {
288327 return err
289328 }
290329 // Calculate the new expected size of the data file and truncate it
@@ -299,18 +338,18 @@ func (t *freezerTable) truncate(items uint64) error {
299338 if expected .filenum != t .headId {
300339 // If already open for reading, force-reopen for writing
301340 t .releaseFile (expected .filenum )
302- newHead , err := t .openFile (expected .filenum , os . O_RDWR | os . O_CREATE | os . O_APPEND )
341+ newHead , err := t .openFile (expected .filenum , openFreezerFileForAppend )
303342 if err != nil {
304343 return err
305344 }
306- // release any files _after the current head -- both the previous head
345+ // Release any files _after the current head -- both the previous head
307346 // and any files which may have been opened for reading
308347 t .releaseFilesAfter (expected .filenum , true )
309- // set back the historic head
348+ // Set back the historic head
310349 t .head = newHead
311350 atomic .StoreUint32 (& t .headId , expected .filenum )
312351 }
313- if err := t .head . Truncate ( int64 (expected .offset )); err != nil {
352+ if err := truncateFreezerFile ( t .head , int64 (expected .offset )); err != nil {
314353 return err
315354 }
316355 // All data files truncated, set internal counters and return
@@ -344,7 +383,7 @@ func (t *freezerTable) Close() error {
344383}
345384
346385// openFile assumes that the write-lock is held by the caller
347- func (t * freezerTable ) openFile (num uint32 , flag int ) (f * os.File , err error ) {
386+ func (t * freezerTable ) openFile (num uint32 , opener func ( string ) ( * os. File , error ) ) (f * os.File , err error ) {
348387 var exist bool
349388 if f , exist = t .files [num ]; ! exist {
350389 var name string
@@ -353,7 +392,7 @@ func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) {
353392 } else {
354393 name = fmt .Sprintf ("%s.%04d.cdat" , t .name , num )
355394 }
356- f , err = os . OpenFile (filepath .Join (t .path , name ), flag , 0644 )
395+ f , err = opener (filepath .Join (t .path , name ))
357396 if err != nil {
358397 return nil , err
359398 }
@@ -413,28 +452,27 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
413452 // we need a new file, writing would overflow
414453 t .lock .RUnlock ()
415454 t .lock .Lock ()
416- nextId := atomic .LoadUint32 (& t .headId ) + 1
455+ nextID := atomic .LoadUint32 (& t .headId ) + 1
417456 // We open the next file in truncated mode -- if this file already
418457 // exists, we need to start over from scratch on it
419- newHead , err := t .openFile (nextId , os . O_RDWR | os . O_CREATE | os . O_TRUNC )
458+ newHead , err := t .openFile (nextID , openFreezerFileTruncated )
420459 if err != nil {
421460 t .lock .Unlock ()
422461 return err
423462 }
424463 // Close old file, and reopen in RDONLY mode
425464 t .releaseFile (t .headId )
426- t .openFile (t .headId , os . O_RDONLY )
465+ t .openFile (t .headId , openFreezerFileForReadOnly )
427466
428467 // Swap out the current head
429468 t .head = newHead
430469 atomic .StoreUint32 (& t .headBytes , 0 )
431- atomic .StoreUint32 (& t .headId , nextId )
470+ atomic .StoreUint32 (& t .headId , nextID )
432471 t .lock .Unlock ()
433472 t .lock .RLock ()
434473 }
435474
436475 defer t .lock .RUnlock ()
437-
438476 if _ , err := t .head .Write (blob ); err != nil {
439477 return err
440478 }
0 commit comments