Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions core/rawdb/freezer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func newCustomTable(path string, name string, readMeter metrics.Meter, writeMete
// compressed idx
idxName = fmt.Sprintf("%s.cidx", name)
}
offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
offsets, err := os.OpenFile(filepath.Join(path, idxName), os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (t *freezerTable) repair() error {

t.index.ReadAt(buffer, offsetsSize-indexEntrySize)
lastIndex.unmarshalBinary(buffer)
t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
t.head, err = t.openFile(lastIndex.filenum, os.O_RDWR|os.O_CREATE)
if err != nil {
return err
}
Expand Down Expand Up @@ -223,7 +223,7 @@ func (t *freezerTable) repair() error {
if newLastIndex.filenum != lastIndex.filenum {
// release earlier opened file
t.releaseFile(lastIndex.filenum)
t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
t.head, err = t.openFile(newLastIndex.filenum, os.O_RDWR|os.O_CREATE)
if stat, err = t.head.Stat(); err != nil {
// TODO, anything more we can do here?
// A data file has gone missing...
Expand Down Expand Up @@ -269,11 +269,11 @@ func (t *freezerTable) preopen() (err error) {
}
}
// Open head in read/write
t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE|os.O_APPEND)
t.head, err = t.openFile(t.headId, os.O_RDWR|os.O_CREATE)
return err
}

// truncate discards any recent data above the provided threashold number.
// truncate discards any recent data above the provided threshold number.
func (t *freezerTable) truncate(items uint64) error {
t.lock.Lock()
defer t.lock.Unlock()
Expand All @@ -299,7 +299,7 @@ func (t *freezerTable) truncate(items uint64) error {
if expected.filenum != t.headId {
// If already open for reading, force-reopen for writing
t.releaseFile(expected.filenum)
newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE|os.O_APPEND)
newHead, err := t.openFile(expected.filenum, os.O_RDWR|os.O_CREATE)
if err != nil {
return err
}
Expand Down Expand Up @@ -435,6 +435,10 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {

defer t.lock.RUnlock()

if _, err := t.head.Seek(0, 2); err != nil {
return err
}

if _, err := t.head.Write(blob); err != nil {
return err
}
Expand All @@ -444,6 +448,7 @@ func (t *freezerTable) Append(item uint64, blob []byte) error {
offset: newOffset,
}
// Write indexEntry
t.index.Seek(0, 2)
t.index.Write(idx.marshallBinary())
t.writeMeter.Mark(int64(bLen + indexEntrySize))
atomic.AddUint64(&t.items, 1)
Expand Down