From e7feb2f869d3662de8468ce6e1b2dfe99bd71a14 Mon Sep 17 00:00:00 2001 From: Frank Szendzielarz Date: Thu, 6 Jun 2019 12:11:46 +0200 Subject: [PATCH 1/3] Fix file system access for Windows --- core/rawdb/freezer_table.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index d1ece414bbfe..0239f7d2eb14 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -122,7 +122,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete // compressed idx idxName = fmt.Sprintf("%s.cidx", name) } - offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) + offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, err } @@ -188,7 +188,7 @@ func (t *freezerTable) repair() error { t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) - t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) + t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE) if err != nil { return err } @@ -223,7 +223,7 @@ func (t *freezerTable) repair() error { if newLastIndex.filenum != lastIndex.filenum { // release earlier opened file t.releaseFile(lastIndex.filenum) - t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) + t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE) if stat, err = t.head.Stat(); err != nil { // TODO, anything more we can do here? // A data file has gone missing... @@ -269,11 +269,11 @@ func (t *freezerTable) preopen() (err error) { } } // Open head in read/write - t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE|os.O_APPEND) + t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE) return err } -// truncate discards any recent data above the provided threashold number. +// truncate discards any recent data above the provided threshold number. func (t *freezerTable) truncate(items uint64) error { t.lock.Lock() defer t.lock.Unlock() @@ -299,7 +299,7 @@ func (t *freezerTable) truncate(items uint64) error { if expected.filenum != t.headId { // If already open for reading, force-reopen for writing t.releaseFile(expected.filenum) - newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND) + newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE) if err != nil { return err } @@ -435,6 +435,10 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { defer t.lock.RUnlock() + if _, err := t.head.Seek(0, 2); err != nil { + return err + } + if _, err := t.head.Write(blob); err != nil { return err } @@ -444,6 +448,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { offset: newOffset, } // Write indexEntry + t.index.Seek(0, 2) t.index.Write(idx.marshallBinary()) t.writeMeter.Mark(int64(bLen + indexEntrySize)) atomic.AddUint64(&t.items, 1) From fb6606843d90ddf464f5d1153364386b9bd7f0c5 Mon Sep 17 00:00:00 2001 From: Frank Szendzielarz Date: Fri, 7 Jun 2019 09:51:49 +0200 Subject: [PATCH 2/3] Encapsulate file accesses --- core/rawdb/freezer_table.go | 79 ++++++++++++++++++++++++++----------- 1 file changed, 56 insertions(+), 23 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 0239f7d2eb14..5c7b21dd3c9c 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -20,6 +20,7 @@ import ( "encoding/binary" "errors" "fmt" + "io" "os" "path/filepath" "sync" @@ -106,6 +107,44 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy) } +// openFreezerFile opens a freezer table file and seeks to the end +func openFreezerFileForAppend(filename string) (*os.File, error) { + //open the file without the O_APPEND flag + //because it has differing behaviour during Truncate operations + //on different OS's + file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return nil, err + } + //seek to end for append + if _, err = file.Seek(0, io.SeekEnd); err != nil { + return nil, err + } + return file, nil +} + +// openFreezerFile opens a freezer table file for read only access +func openFreezerFileForReadOnly(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDONLY, 0644) +} + +// openFreezerFile opens a freezer table making sure it is truncated +func openFreezerFileTruncated(filename string) (*os.File, error) { + return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) +} + +// truncateFreezerFile resizes a freezer table file and seeks to the end +func truncateFreezerFile(file *os.File, size int64) error { + if err := file.Truncate(size); err != nil { + return err + } + //seek to end for append + if _, err := file.Seek(0, io.SeekEnd); err != nil { + return err + } + return nil +} + // newCustomTable opens a freezer table, creating the data and index files if they are // non existent. Both files are truncated to the shortest common length to ensure // they don't go out of sync. @@ -122,7 +161,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete // compressed idx idxName = fmt.Sprintf("%s.cidx", name) } - offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE, 0644) + offsets, err := openFreezerFileForAppend(filepath.Join(path, idxName)) if err != nil { return nil, err } @@ -163,7 +202,7 @@ func (t *freezerTable) repair() error { } // Ensure the index is a multiple of indexEntrySize bytes if overflow := stat.Size() % indexEntrySize; overflow != 0 { - t.index.Truncate(stat.Size() - overflow) // New file can't trigger this path + truncateFreezerFile(t.index, stat.Size()-overflow) // New file can't trigger this path } // Retrieve the file sizes and prepare for truncation if stat, err = t.index.Stat(); err != nil { @@ -188,7 +227,7 @@ func (t *freezerTable) repair() error { t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) - t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE) + t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForAppend) if err != nil { return err } @@ -204,7 +243,7 @@ func (t *freezerTable) repair() error { // Truncate the head file to the last offset pointer if contentExp < contentSize { t.logger.Warn("Truncating dangling head", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) - if err := t.head.Truncate(contentExp); err != nil { + if err := truncateFreezerFile(t.head, contentExp); err != nil { return err } contentSize = contentExp @@ -212,7 +251,7 @@ func (t *freezerTable) repair() error { // Truncate the index to point within the head file if contentExp > contentSize { t.logger.Warn("Truncating dangling indexes", "indexed", common.StorageSize(contentExp), "stored", common.StorageSize(contentSize)) - if err := t.index.Truncate(offsetsSize - indexEntrySize); err != nil { + if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { return err } offsetsSize -= indexEntrySize @@ -223,7 +262,7 @@ func (t *freezerTable) repair() error { if newLastIndex.filenum != lastIndex.filenum { // release earlier opened file t.releaseFile(lastIndex.filenum) - t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE) + t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend) if stat, err = t.head.Stat(); err != nil { // TODO, anything more we can do here? // A data file has gone missing... @@ -264,12 +303,12 @@ func (t *freezerTable) preopen() (err error) { t.releaseFilesAfter(0, false) // Open all except head in RDONLY for i := t.tailId; i < t.headId; i++ { - if _, err = t.openFile(i, os.O_RDONLY); err != nil { + if _, err = t.openFile(i, openFreezerFileForReadOnly); err != nil { return err } } // Open head in read/write - t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE) + t.head, err = t.openFile(t.headId, openFreezerFileForAppend) return err } @@ -284,7 +323,7 @@ func (t *freezerTable) truncate(items uint64) error { } // Something's out of sync, truncate the table's offset index t.logger.Warn("Truncating freezer table", "items", t.items, "limit", items) - if err := t.index.Truncate(int64(items+1) * indexEntrySize); err != nil { + if err := truncateFreezerFile(t.index, int64(items+1)*indexEntrySize); err != nil { return err } // Calculate the new expected size of the data file and truncate it @@ -299,7 +338,7 @@ func (t *freezerTable) truncate(items uint64) error { if expected.filenum != t.headId { // If already open for reading, force-reopen for writing t.releaseFile(expected.filenum) - newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE) + newHead, err := t.openFile(expected.filenum, openFreezerFileForAppend) if err != nil { return err } @@ -310,7 +349,7 @@ func (t *freezerTable) truncate(items uint64) error { t.head = newHead atomic.StoreUint32(&t.headId, expected.filenum) } - if err := t.head.Truncate(int64(expected.offset)); err != nil { + if err := truncateFreezerFile(t.head, int64(expected.offset)); err != nil { return err } // All data files truncated, set internal counters and return @@ -344,7 +383,7 @@ func (t *freezerTable) Close() error { } // openFile assumes that the write-lock is held by the caller -func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) { +func (t *freezerTable) openFile(num uint32, opener func(string) (*os.File, error)) (f *os.File, err error) { var exist bool if f, exist = t.files[num]; !exist { var name string @@ -353,7 +392,7 @@ func (t *freezerTable) openFile(num uint32, flag int) (f *os.File, err error) { } else { name = fmt.Sprintf("%s.%04d.cdat", t.name, num) } - f, err = os.OpenFile(filepath.Join(t.path, name), flag, 0644) + f, err = opener(filepath.Join(t.path, name)) if err != nil { return nil, err } @@ -413,32 +452,27 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { // we need a new file, writing would overflow t.lock.RUnlock() t.lock.Lock() - nextId := atomic.LoadUint32(&t.headId) + 1 + nextID := atomic.LoadUint32(&t.headId) + 1 // We open the next file in truncated mode -- if this file already // exists, we need to start over from scratch on it - newHead, err := t.openFile(nextId, os.O_RDWR|os.O_CREATE|os.O_TRUNC) + newHead, err := t.openFile(nextID, openFreezerFileTruncated) if err != nil { t.lock.Unlock() return err } // Close old file, and reopen in RDONLY mode t.releaseFile(t.headId) - t.openFile(t.headId, os.O_RDONLY) + t.openFile(t.headId, openFreezerFileForReadOnly) // Swap out the current head t.head = newHead atomic.StoreUint32(&t.headBytes, 0) - atomic.StoreUint32(&t.headId, nextId) + atomic.StoreUint32(&t.headId, nextID) t.lock.Unlock() t.lock.RLock() } defer t.lock.RUnlock() - - if _, err := t.head.Seek(0, 2); err != nil { - return err - } - if _, err := t.head.Write(blob); err != nil { return err } @@ -448,7 +482,6 @@ func (t *freezerTable) Append(item uint64, blob []byte) error { offset: newOffset, } // Write indexEntry - t.index.Seek(0, 2) t.index.Write(idx.marshallBinary()) t.writeMeter.Mark(int64(bLen + indexEntrySize)) atomic.AddUint64(&t.items, 1) From 5d7e17f484323b7c3717141276f1f5cf6e6cf70a Mon Sep 17 00:00:00 2001 From: Frank Szendzielarz Date: Sun, 9 Jun 2019 12:20:25 +0200 Subject: [PATCH 3/3] Style fixes --- core/rawdb/freezer_table.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 5c7b21dd3c9c..1e5c7cd0b05a 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -107,28 +107,28 @@ func newTable(path string, name string, readMeter metrics.Meter, writeMeter metr return newCustomTable(path, name, readMeter, writeMeter, 2*1000*1000*1000, disableSnappy) } -// openFreezerFile opens a freezer table file and seeks to the end +// openFreezerFileForAppend opens a freezer table file and seeks to the end func openFreezerFileForAppend(filename string) (*os.File, error) { - //open the file without the O_APPEND flag - //because it has differing behaviour during Truncate operations - //on different OS's + // Open the file without the O_APPEND flag + // because it has differing behaviour during Truncate operations + // on different OS's file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0644) if err != nil { return nil, err } - //seek to end for append + // Seek to end for append if _, err = file.Seek(0, io.SeekEnd); err != nil { return nil, err } return file, nil } -// openFreezerFile opens a freezer table file for read only access +// openFreezerFileForReadOnly opens a freezer table file for read only access func openFreezerFileForReadOnly(filename string) (*os.File, error) { return os.OpenFile(filename, os.O_RDONLY, 0644) } -// openFreezerFile opens a freezer table making sure it is truncated +// openFreezerFileTruncated opens a freezer table making sure it is truncated func openFreezerFileTruncated(filename string) (*os.File, error) { return os.OpenFile(filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) } @@ -138,7 +138,7 @@ func truncateFreezerFile(file *os.File, size int64) error { if err := file.Truncate(size); err != nil { return err } - //seek to end for append + // Seek to end for append if _, err := file.Seek(0, io.SeekEnd); err != nil { return err } @@ -155,10 +155,10 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete } var idxName string if noCompression { - // raw idx + // Raw idx idxName = fmt.Sprintf("%s.ridx", name) } else { - // compressed idx + // Compressed idx idxName = fmt.Sprintf("%s.cidx", name) } offsets, err := openFreezerFileForAppend(filepath.Join(path, idxName)) @@ -260,7 +260,7 @@ func (t *freezerTable) repair() error { newLastIndex.unmarshalBinary(buffer) // We might have slipped back into an earlier head-file here if newLastIndex.filenum != lastIndex.filenum { - // release earlier opened file + // Release earlier opened file t.releaseFile(lastIndex.filenum) t.head, err = t.openFile(newLastIndex.filenum, openFreezerFileForAppend) if stat, err = t.head.Stat(); err != nil { @@ -342,10 +342,10 @@ func (t *freezerTable) truncate(items uint64) error { if err != nil { return err } - // release any files _after the current head -- both the previous head + // Release any files _after the current head -- both the previous head // and any files which may have been opened for reading t.releaseFilesAfter(expected.filenum, true) - // set back the historic head + // Set back the historic head t.head = newHead atomic.StoreUint32(&t.headId, expected.filenum) }