Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 57 additions & 19 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -106,6 +107,44 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr
return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy)
}

// openFreezerFile opens a freezer table file and seeks to the end
func openFreezerFileForAppend(filename string) (*os.File, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick, please fix the comments so they reference the correct method (on this and the 2 methods below).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And in general, please leave an empty space after the // characters within the method comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And please capitalize min-method comments :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol, well spotted. thanks, and done :-)

//open the file without the O_APPEND flag
//because it has differing behaviour during Truncate operations
//on different OS's
file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
//seek to end for append
if _, err = file.Seek(0, io.SeekEnd); err != nil {
return nil, err
}
return file, nil
}

// openFreezerFile opens a freezer table file for read only access
func openFreezerFileForReadOnly(filename string) (*os.File, error) {
return os.OpenFile(filename, os.O_RDONLY, 0644)
}

// openFreezerFile opens a freezer table making sure it is truncated
func openFreezerFileTruncated(filename string) (*os.File, error) {
return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
}

// truncateFreezerFile resizes a freezer table file and seeks to the end
func truncateFreezerFile(file *os.File, size int64) error {
if err := file.Truncate(size); err != nil {
return err
}
//seek to end for append
if _, err := file.Seek(0, io.SeekEnd); err != nil {
return err
}
return nil
}

// newCustomTable opens a freezer table, creating the data and index files if they are
// non existent. Both files are truncated to the shortest common length to ensure
// they don't go out of sync.
Expand All @@ -122,7 +161,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
// compressed idx
idxName = fmt.Sprintf("%s.cidx", name)
}
offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
offsets, err := openFreezerFileForAppend(filepath.Join(path, idxName))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -163,7 +202,7 @@ func (t *freezerTable) repair() error {
}
// Ensure the index is a multiple of indexEntrySize bytes
if overflow := stat.Size() % indexEntrySize; overflow != 0 {
t.index.Truncate(stat.Size() - overflow) // New file can't trigger this path
truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path
}
// Retrieve the file sizes and prepare for truncation
if stat, err = t.index.Stat(); err != nil {
Expand All @@ -188,7 +227,7 @@ func (t *freezerTable) repair() error {

t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
lastIndex.unmarshalBinary(buffer)
t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend)
if err != nil {
return err
}
Expand All @@ -204,15 +243,15 @@ func (t *freezerTable) repair() error {
// Truncate the head file to the last offset pointer
if contentExp < contentSize {
t.logger.Warn("Truncating dangling head", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize))
if err := t.head.Truncate(contentExp); err != nil {
if err := truncateFreezerFile(t.head, contentExp); err != nil {
return err
}
contentSize = contentExp
}
// Truncate the index to point within the head file
if contentExp > contentSize {
t.logger.Warn("Truncating dangling indexes", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize))
if err := t.index.Truncate(offsetsSize - indexEntrySize); err != nil {
if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil {
return err
}
offsetsSize -= indexEntrySize
Expand All @@ -223,7 +262,7 @@ func (t *freezerTable) repair() error {
if newLastIndex.filenum != lastIndex.filenum {
// release earlier opened file
t.releaseFile(lastIndex.filenum)
t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend)
if stat, err = t.head.Stat(); err != nil {
// TODO, anything more we can do here?
// A data file has gone missing...
Expand Down Expand Up @@ -264,16 +303,16 @@ func (t *freezerTable) preopen() (err error) {
t.releaseFilesAfter(0, false)
// Open all except head in RDONLY
for i := t.tailId; i < t.headId; i++ {
if _, err = t.openFile(i, os.O_RDONLY); err != nil {
if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil {
return err
}
}
// Open head in read/write
t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE|os.O_APPEND)
t.head, err = t.openFile(t.headId, openFreezerFileForAppend)
return err
}

// truncate discards any recent data above the provided threashold number.
// truncate discards any recent data above the provided threshold number.
func (t *freezerTable) truncate(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
Expand All @@ -284,7 +323,7 @@ func (t *freezerTable) truncate(items uint64) error {
}
// Something's out of sync, truncate the table's offset index
t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items)
if err := t.index.Truncate(int64(items+1) * indexEntrySize); err != nil {
if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil {
return err
}
// Calculate the new expected size of the data file and truncate it
Expand All @@ -299,7 +338,7 @@ func (t *freezerTable) truncate(items uint64) error {
if expected.filenum != t.headId {
// If already open for reading, force-reopen for writing
t.releaseFile(expected.filenum)
newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
newHead, err := t.openFile(expected.filenum, openFreezerFileForAppend)
if err != nil {
return err
}
Expand All @@ -310,7 +349,7 @@ func (t *freezerTable) truncate(items uint64) error {
t.head = newHead
atomic.StoreUint32(&t.headId, expected.filenum)
}
if err := t.head.Truncate(int64(expected.offset)); err != nil {
if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil {
return err
}
// All data files truncated, set internal counters and return
Expand Down Expand Up @@ -344,7 +383,7 @@ func (t *freezerTable) Close() error {
}

// openFile assumes that the write-lock is held by the caller
func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) {
func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) {
var exist bool
if f, exist = t.files[num]; !exist {
var name string
Expand All @@ -353,7 +392,7 @@ func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) {
} else {
name = fmt.Sprintf("%s.%04d.cdat", t.name, num)
}
f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644)
f, err = opener(filepath.Join(t.path, name))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -413,28 +452,27 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
// we need a new file, writing would overflow
t.lock.RUnlock()
t.lock.Lock()
nextId := atomic.LoadUint32(&t.headId) + 1
nextID := atomic.LoadUint32(&t.headId) + 1
// We open the next file in truncated mode -- if this file already
// exists, we need to start over from scratch on it
newHead, err := t.openFile(nextId, os.O_RDWR|os.O_CREATE|os.O_TRUNC)
newHead, err := t.openFile(nextID, openFreezerFileTruncated)
if err != nil {
t.lock.Unlock()
return err
}
// Close old file, and reopen in RDONLY mode
t.releaseFile(t.headId)
t.openFile(t.headId, os.O_RDONLY)
t.openFile(t.headId, openFreezerFileForReadOnly)

// Swap out the current head
t.head = newHead
atomic.StoreUint32(&t.headBytes, 0)
atomic.StoreUint32(&t.headId, nextId)
atomic.StoreUint32(&t.headId, nextID)
t.lock.Unlock()
t.lock.RLock()
}

defer t.lock.RUnlock()

if _, err := t.head.Write(blob); err != nil {
return err
}
Expand Down