From 3049d138b75be5906d11b87361361cc53dadcbf7 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Wed, 15 May 2024 16:48:15 +0800 Subject: [PATCH 1/5] core/rawdb: introduce flush offset in freezer --- core/blockchain.go | 10 +- core/rawdb/accessors_chain_test.go | 3 +- core/rawdb/ancient_scheme.go | 1 + core/rawdb/freezer_batch.go | 10 +- core/rawdb/freezer_meta.go | 181 ++++++++++----- core/rawdb/freezer_meta_test.go | 88 +++++++- core/rawdb/freezer_table.go | 260 +++++++++++++++------- core/rawdb/freezer_table_test.go | 342 ++++++++++++++++++++--------- 8 files changed, 644 insertions(+), 251 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index ab88f4b68ee2..f097dc978108 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -113,23 +113,29 @@ const ( // * the `BlockNumber`, `TxHash`, `TxIndex`, `BlockHash` and `Index` fields of log are deleted // * the `Bloom` field of receipt is deleted // * the `BlockIndex` and `TxIndex` fields of txlookup are deleted + // // - Version 5 // The following incompatible database changes were added: // * the `TxHash`, `GasCost`, and `ContractAddress` fields are no longer stored for a receipt // * the `TxHash`, `GasCost`, and `ContractAddress` fields are computed by looking up the // receipts' corresponding block + // // - Version 6 // The following incompatible database changes were added: // * Transaction lookup information stores the corresponding block number instead of block hash + // // - Version 7 // The following incompatible database changes were added: // * Use freezer as the ancient database to maintain all ancient data + // // - Version 8 // The following incompatible database changes were added: // * New scheme for contract code in order to separate the codes and trie nodes + // // - Version 9 - // Total difficulty has been removed from both the key-value store and the - // ancient store, the td freezer table has been deprecated since that. + // The following incompatible database changes were added: + // * Total difficulty has been removed from both the key-value store and the ancient store. + // * The metadata structure of freezer is changed by adding 'flushOffset' BlockChainVersion uint64 = 9 ) diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index 5533c60ab9fd..b9684f8e17d9 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -849,6 +849,7 @@ func TestHeadersRLPStorage(t *testing.T) { t.Fatalf("failed to create database with ancient backend") } defer db.Close() + // Create blocks var chain []*types.Block var pHash common.Hash @@ -864,7 +865,7 @@ func TestHeadersRLPStorage(t *testing.T) { chain = append(chain, block) pHash = block.Hash() } - var receipts []types.Receipts = make([]types.Receipts, 100) + receipts := make([]types.Receipts, 100) // Write first half to ancients WriteAncientBlocks(db, chain[:50], receipts[:50]) // Write second half to db diff --git a/core/rawdb/ancient_scheme.go b/core/rawdb/ancient_scheme.go index 54a3be391f1f..67bfa37ecc37 100644 --- a/core/rawdb/ancient_scheme.go +++ b/core/rawdb/ancient_scheme.go @@ -58,6 +58,7 @@ const ( stateHistoryStorageData = "storage.data" ) +// stateFreezerNoSnappy configures whether compression is disabled for the state freezer. var stateFreezerNoSnappy = map[string]bool{ stateHistoryMeta: true, stateHistoryAccountIndex: false, diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index 013d0b9d138c..0b354cb6be2a 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -19,6 +19,7 @@ package rawdb import ( "fmt" "math" + "time" "github.com/ethereum/go-ethereum/rlp" "github.com/golang/snappy" @@ -188,9 +189,6 @@ func (batch *freezerTableBatch) commit() error { if err != nil { return err } - if err := batch.t.head.Sync(); err != nil { - return err - } dataSize := int64(len(batch.dataBuffer)) batch.dataBuffer = batch.dataBuffer[:0] @@ -208,6 +206,12 @@ func (batch *freezerTableBatch) commit() error { // Update metrics. batch.t.sizeGauge.Inc(dataSize + indexSize) batch.t.writeMeter.Mark(dataSize + indexSize) + + // Periodically sync the table, todo (rjl493456442) make it configurable? + if time.Since(batch.t.lastSync) > 30*time.Second { + batch.t.lastSync = time.Now() + return batch.t.syncWithNoLock() + } return nil } diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go index 9eef9df351df..f60fce79ac62 100644 --- a/core/rawdb/freezer_meta.go +++ b/core/rawdb/freezer_meta.go @@ -17,93 +17,166 @@ package rawdb import ( + "errors" "io" "os" - "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) -const freezerVersion = 1 // The initial version tag of freezer table metadata +const ( + freezerTableV1 = 1 // Initial version of metadata struct + freezerTableV2 = 2 // Add field: 'flushOffset' +) -// freezerTableMeta wraps all the metadata of the freezer table. +// freezerTableMeta is a collection of additional properties that describe the +// freezer table. These properties are designed with error resilience, allowing +// them to be automatically corrected after an error occurs without significantly +// impacting overall correctness. type freezerTableMeta struct { - // Version is the versioning descriptor of the freezer table. - Version uint16 + file *os.File // file handler of metadata + version uint16 // version descriptor of the freezer table - // VirtualTail indicates how many items have been marked as deleted. - // Its value is equal to the number of items removed from the table - // plus the number of items hidden in the table, so it should never - // be lower than the "actual tail". - VirtualTail uint64 -} + // virtualTail represents the number of items marked as deleted. It is + // calculated as the sum of items removed from the table and the items + // hidden within the table, and should never be less than the "actual + // tail". + // + // If lost due to a crash or other reasons, it will be reset to the number + // of items deleted from the table, causing the previously hidden items + // to become visible, which is an acceptable consequence. + virtualTail uint64 -// newMetadata initializes the metadata object with the given virtual tail. -func newMetadata(tail uint64) *freezerTableMeta { - return &freezerTableMeta{ - Version: freezerVersion, - VirtualTail: tail, - } + // flushOffset represents the offset in the index file up to which the index + // items along with the corresponding data items in data files has been flushed + // (fsync’d) to disk. Beyond this offset, data integrity is not guaranteed, + // the extra index items along with the associated data items should be removed + // during the startup. + // + // The principle is that all data items above the flush offset are considered + // volatile and should be recoverable if they are discarded after the unclean + // shutdown. If data integrity is required, manually force a sync of the + // freezer before proceeding with further operations (e.g. do freezer.Sync() + // first and then write data to key value store in some circumstances). + // + // The offset could be moved forward by applying sync operation, or be moved + // backward in cases of head/tail truncation, etc. + flushOffset uint64 } -// readMetadata reads the metadata of the freezer table from the -// given metadata file. -func readMetadata(file *os.File) (*freezerTableMeta, error) { +// decodeV1 attempts to decode the metadata structure in v1 format. If fails or +// the result is incompatible, nil is returned. +func decodeV1(file *os.File) *freezerTableMeta { _, err := file.Seek(0, io.SeekStart) if err != nil { - return nil, err + return nil } - var meta freezerTableMeta - if err := rlp.Decode(file, &meta); err != nil { - return nil, err + type obj struct { + Version uint16 + Tail uint64 + } + var o obj + if err := rlp.Decode(file, &o); err != nil { + return nil + } + if o.Version != freezerTableV1 { + return nil + } + return &freezerTableMeta{ + file: file, + version: o.Version, + virtualTail: o.Tail, } - return &meta, nil } -// writeMetadata writes the metadata of the freezer table into the -// given metadata file. -func writeMetadata(file *os.File, meta *freezerTableMeta) error { +// decodeV2 attempts to decode the metadata structure in v2 format. If fails or +// the result is incompatible, nil is returned. +func decodeV2(file *os.File) *freezerTableMeta { _, err := file.Seek(0, io.SeekStart) if err != nil { - return err + return nil + } + type obj struct { + Version uint16 + Tail uint64 + Offset uint64 + } + var o obj + if err := rlp.Decode(file, &o); err != nil { + return nil + } + if o.Version != freezerTableV2 { + return nil + } + return &freezerTableMeta{ + file: file, + version: freezerTableV2, + virtualTail: o.Tail, + flushOffset: o.Offset, } - return rlp.Encode(file, meta) } -// loadMetadata loads the metadata from the given metadata file. -// Initializes the metadata file with the given "actual tail" if -// it's empty. -func loadMetadata(file *os.File, tail uint64) (*freezerTableMeta, error) { +// newMetadata initializes the metadata object, either by loading it from the file +// or by constructing a new one from scratch. +func newMetadata(file *os.File) (*freezerTableMeta, error) { stat, err := file.Stat() if err != nil { return nil, err } - // Write the metadata with the given actual tail into metadata file - // if it's non-existent. There are two possible scenarios here: - // - the freezer table is empty - // - the freezer table is legacy - // In both cases, write the meta into the file with the actual tail - // as the virtual tail. if stat.Size() == 0 { - m := newMetadata(tail) - if err := writeMetadata(file, m); err != nil { + m := &freezerTableMeta{ + file: file, + version: freezerTableV2, + virtualTail: 0, + flushOffset: 0, + } + if err := m.write(true); err != nil { return nil, err } return m, nil } - m, err := readMetadata(file) + if m := decodeV2(file); m != nil { + return m, nil + } + if m := decodeV1(file); m != nil { + return m, nil // legacy metadata + } + return nil, errors.New("failed to decode metadata") +} + +// setVirtualTail sets the virtual tail and flushes the metadata if sync is true. +func (m *freezerTableMeta) setVirtualTail(tail uint64, sync bool) error { + m.virtualTail = tail + return m.write(sync) +} + +// setFlushOffset sets the flush offset and flushes the metadata if sync is true. +func (m *freezerTableMeta) setFlushOffset(offset uint64, sync bool) error { + m.flushOffset = offset + return m.write(sync) +} + +// write flushes the content of metadata into file and performs a fsync if required. +func (m *freezerTableMeta) write(sync bool) error { + type obj struct { + Version uint16 + Tail uint64 + Offset uint64 + } + var o obj + o.Version = freezerTableV2 // forcibly set it to v2 + o.Tail = m.virtualTail + o.Offset = m.flushOffset + + _, err := m.file.Seek(0, io.SeekStart) if err != nil { - return nil, err + return err } - // Update the virtual tail with the given actual tail if it's even - // lower than it. Theoretically it shouldn't happen at all, print - // a warning here. - if m.VirtualTail < tail { - log.Warn("Updated virtual tail", "have", m.VirtualTail, "now", tail) - m.VirtualTail = tail - if err := writeMetadata(file, m); err != nil { - return nil, err - } + if err := rlp.Encode(m.file, &o); err != nil { + return err + } + if !sync { + return nil } - return m, nil + return m.file.Sync() } diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go index 409e8110262a..b02c23090190 100644 --- a/core/rawdb/freezer_meta_test.go +++ b/core/rawdb/freezer_meta_test.go @@ -19,6 +19,8 @@ package rawdb import ( "os" "testing" + + "github.com/ethereum/go-ethereum/rlp" ) func TestReadWriteFreezerTableMeta(t *testing.T) { @@ -27,36 +29,98 @@ func TestReadWriteFreezerTableMeta(t *testing.T) { t.Fatalf("Failed to create file %v", err) } defer f.Close() - err = writeMetadata(f, newMetadata(100)) + + meta, err := newMetadata(f) if err != nil { - t.Fatalf("Failed to write metadata %v", err) + t.Fatalf("Failed to new metadata %v", err) } - meta, err := readMetadata(f) + meta.setVirtualTail(100, false) + + meta, err = newMetadata(f) if err != nil { - t.Fatalf("Failed to read metadata %v", err) + t.Fatalf("Failed to reload metadata %v", err) } - if meta.Version != freezerVersion { + if meta.version != freezerTableV2 { t.Fatalf("Unexpected version field") } - if meta.VirtualTail != uint64(100) { + if meta.virtualTail != uint64(100) { t.Fatalf("Unexpected virtual tail field") } } -func TestInitializeFreezerTableMeta(t *testing.T) { +func TestUpgradeMetadata(t *testing.T) { f, err := os.CreateTemp(t.TempDir(), "*") if err != nil { t.Fatalf("Failed to create file %v", err) } defer f.Close() - meta, err := loadMetadata(f, uint64(100)) + + // Write legacy metadata into file + type obj struct { + Version uint16 + Tail uint64 + } + var o obj + o.Version = freezerTableV1 + o.Tail = 100 + + if err := rlp.Encode(f, &o); err != nil { + t.Fatalf("Failed to encode %v", err) + } + + // Reload the metadata, a silent upgrade is expected + meta, err := newMetadata(f) if err != nil { t.Fatalf("Failed to read metadata %v", err) } - if meta.Version != freezerVersion { - t.Fatalf("Unexpected version field") + if meta.version != freezerTableV1 { + t.Fatal("Unexpected version field") } - if meta.VirtualTail != uint64(100) { - t.Fatalf("Unexpected virtual tail field") + if meta.virtualTail != uint64(100) { + t.Fatal("Unexpected virtual tail field") + } + if meta.flushOffset != uint64(0) { + t.Fatal("Unexpected flush offset field") + } + + meta.setFlushOffset(100, true) + + meta, err = newMetadata(f) + if err != nil { + t.Fatalf("Failed to read metadata %v", err) + } + if meta.version != freezerTableV2 { + t.Fatal("Unexpected version field") + } + if meta.virtualTail != uint64(100) { + t.Fatal("Unexpected virtual tail field") + } + if meta.flushOffset != uint64(100) { + t.Fatal("Unexpected flush offset field") + } +} + +func TestInvalidMetadata(t *testing.T) { + f, err := os.CreateTemp(t.TempDir(), "*") + if err != nil { + t.Fatalf("Failed to create file %v", err) + } + defer f.Close() + + // Write invalid legacy metadata into file + type obj struct { + Version uint16 + Tail uint64 + } + var o obj + o.Version = freezerTableV2 // -> invalid version tag + o.Tail = 100 + + if err := rlp.Encode(f, &o); err != nil { + t.Fatalf("Failed to encode %v", err) + } + _, err = newMetadata(f) + if err == nil { + t.Fatal("Unexpected success") } } diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 38c47dc223d3..f55c117074fc 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -108,11 +108,13 @@ type freezerTable struct { head *os.File // File descriptor for the data head of the table index *os.File // File descriptor for the indexEntry file of the table - meta *os.File // File descriptor for metadata of the table files map[uint32]*os.File // open files headId uint32 // number of the currently active head file tailId uint32 // number of the earliest file + metadata *freezerTableMeta // metadata of the table + lastSync time.Time // Timestamp when the last sync was performed + headBytes int64 // Number of bytes written to the head file readMeter *metrics.Meter // Meter for measuring the effective amount of data read writeMeter *metrics.Meter // Meter for measuring the effective amount of data written @@ -166,10 +168,17 @@ func newTable(path string, name string, readMeter, writeMeter *metrics.Meter, si return nil, err } } + // Load metadata from the file. The tag will be true if legacy metadata + // is detected. + metadata, err := newMetadata(meta) + if err != nil { + return nil, err + } // Create the table and repair any past inconsistency tab := &freezerTable{ index: index, - meta: meta, + metadata: metadata, + lastSync: time.Now(), files: make(map[uint32]*os.File), readMeter: readMeter, writeMeter: writeMeter, @@ -221,13 +230,11 @@ func (t *freezerTable) repair() error { return err } // New file can't trigger this path } - // Validate the index file as it might contain some garbage data after the - // power failures. if err := t.repairIndex(); err != nil { return err } // Retrieve the file sizes and prepare for truncation. Note the file size - // might be changed after index validation. + // might be changed after index repair. if stat, err = t.index.Stat(); err != nil { return err } @@ -253,12 +260,14 @@ func (t *freezerTable) repair() error { t.tailId = firstIndex.filenum t.itemOffset.Store(uint64(firstIndex.offset)) - // Load metadata from the file - meta, err := loadMetadata(t.meta, t.itemOffset.Load()) - if err != nil { - return err + // Adjust the number of hidden items if it is less than the number of items + // being removed. + if t.itemOffset.Load() > t.metadata.virtualTail { + if err := t.metadata.setVirtualTail(t.itemOffset.Load(), true); err != nil { + return err + } } - t.itemHidden.Store(meta.VirtualTail) + t.itemHidden.Store(t.metadata.virtualTail) // Read the last index, use the default value in case the freezer is empty if offsetsSize == indexEntrySize { @@ -267,12 +276,6 @@ func (t *freezerTable) repair() error { t.index.ReadAt(buffer, offsetsSize-indexEntrySize) lastIndex.unmarshalBinary(buffer) } - // Print an error log if the index is corrupted due to an incorrect - // last index item. While it is theoretically possible to have a zero offset - // by storing all zero-size items, it is highly unlikely to occur in practice. - if lastIndex.offset == 0 && offsetsSize/indexEntrySize > 1 { - log.Error("Corrupted index file detected", "lastOffset", lastIndex.offset, "indexes", offsetsSize/indexEntrySize) - } if t.readonly { t.head, err = t.openFile(lastIndex.filenum, openFreezerFileForReadOnly) } else { @@ -293,6 +296,7 @@ func (t *freezerTable) repair() error { return fmt.Errorf("freezer table(path: %s, name: %s, num: %d) is corrupted", t.path, t.name, lastIndex.filenum) } verbose = true + // Truncate the head file to the last offset pointer if contentExp < contentSize { t.logger.Warn("Truncating dangling head", "indexed", contentExp, "stored", contentSize) @@ -304,11 +308,23 @@ func (t *freezerTable) repair() error { // Truncate the index to point within the head file if contentExp > contentSize { t.logger.Warn("Truncating dangling indexes", "indexes", offsetsSize/indexEntrySize, "indexed", contentExp, "stored", contentSize) - if err := truncateFreezerFile(t.index, offsetsSize-indexEntrySize); err != nil { + + newOffset := offsetsSize - indexEntrySize + if err := truncateFreezerFile(t.index, newOffset); err != nil { return err } offsetsSize -= indexEntrySize + // If the index file is truncated beyond the flush offset, move the flush + // offset back to the new end of the file. A crash may occur before the + // offset is updated, leaving a dangling reference that points to a position + // outside the file. If so, the offset will be reset to the new end of the + // file during the next run. + if t.metadata.flushOffset < uint64(newOffset) { + if err := t.metadata.setFlushOffset(uint64(newOffset), true); err != nil { + return err + } + } // Read the new head index, use the default value in case // the freezer is already empty. var newLastIndex indexEntry @@ -345,7 +361,7 @@ func (t *freezerTable) repair() error { if err := t.head.Sync(); err != nil { return err } - if err := t.meta.Sync(); err != nil { + if err := t.metadata.file.Sync(); err != nil { return err } } @@ -372,7 +388,60 @@ func (t *freezerTable) repair() error { return nil } -// repairIndex validates the integrity of the index file. According to the design, +func (t *freezerTable) repairIndex() error { + stat, err := t.index.Stat() + if err != nil { + return err + } + offset := stat.Size() + + // Validate the items in the index file to ensure the data integrity. + // It's possible some garbage data is retained in the index file after + // the power failures and should be truncated first. + offset, err = t.checkIndex(offset) + if err != nil { + return err + } + // If legacy metadata is detected, attempt to recover the offset from the + // index file to avoid clearing the entire table. + if t.metadata.version == freezerTableV1 { + t.logger.Info("Recover the flush offset for legacy table", "offset", offset) + return t.metadata.setFlushOffset(uint64(offset), true) + } + // Move the flush offset to the end of the file for fresh new freezer table + if offset == indexEntrySize && t.metadata.flushOffset == 0 { + return t.metadata.setFlushOffset(uint64(offset), true) + } + // Short circuit if the offset is aligned with the index file + if offset == int64(t.metadata.flushOffset) { + return nil + } + // Extra index items have been detected beyond the flush offset. Since these + // entries correspond to data that has not been fully flushed to disk in the + // last run (because of unclean shutdown), their integrity cannot be guaranteed. + // To ensure consistency, these index items will be truncated, as there is no + // reliable way to validate or recover their associated data. + if offset > int64(t.metadata.flushOffset) { + size := offset - int64(t.metadata.flushOffset) + if t.readonly { + return fmt.Errorf("index file(path: %s, name: %s) contains garbage data %d", t.path, t.name, size) + } + t.logger.Info("Truncate excessive items", "size", size) + return truncateFreezerFile(t.index, int64(t.metadata.flushOffset)) + } + // Flush offset refers to the position which exceeds the index file. The only + // possible scenario for this is: power failure or system crash occurs after + // truncating the segment in index file from head or tail, but without updating + // the flush offset. In this case, automatically reset the flush offset with + // the file size which implies the entire index file is complete. + if t.readonly { + return nil // do nothing in read only mode + } + t.logger.Info("Rewind the flush offset", "old", t.metadata.flushOffset, "new", offset) + return t.metadata.setFlushOffset(uint64(offset), true) +} + +// checkIndex validates the integrity of the index file. According to the design, // the initial entry in the file denotes the earliest data file along with the // count of deleted items. Following this, all subsequent entries in the file must // be in order. This function identifies any corrupted entries and truncates items @@ -392,18 +461,11 @@ func (t *freezerTable) repair() error { // leftover garbage or if all items in the table have zero size is impossible. // In such instances, the file will remain unchanged to prevent potential data // loss or misinterpretation. -func (t *freezerTable) repairIndex() error { - // Retrieve the file sizes and prepare for validation - stat, err := t.index.Stat() - if err != nil { - return err - } - size := stat.Size() - +func (t *freezerTable) checkIndex(size int64) (int64, error) { // Move the read cursor to the beginning of the file - _, err = t.index.Seek(0, io.SeekStart) + _, err := t.index.Seek(0, io.SeekStart) if err != nil { - return err + return 0, err } fr := bufio.NewReader(t.index) @@ -425,21 +487,21 @@ func (t *freezerTable) repairIndex() error { entry.unmarshalBinary(buff) return entry, nil } - truncate = func(offset int64) error { + truncate = func(offset int64) (int64, error) { if t.readonly { - return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size) + return 0, fmt.Errorf("index file is corrupted at %d, size: %d", offset, size) } if err := truncateFreezerFile(t.index, offset); err != nil { - return err + return 0, err } log.Warn("Truncated index file", "offset", offset, "truncated", size-offset) - return nil + return offset, nil } ) for offset := int64(0); offset < size; offset += indexEntrySize { entry, err := read() if err != nil { - return err + return 0, err } if offset == 0 { head = entry @@ -468,10 +530,10 @@ func (t *freezerTable) repairIndex() error { // the seek operation anyway as a precaution. _, err = t.index.Seek(0, io.SeekEnd) if err != nil { - return err + return 0, err } log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start))) - return nil + return size, nil } // checkIndexItems validates the correctness of two consecutive index items based @@ -550,12 +612,23 @@ func (t *freezerTable) truncateHead(items uint64) error { // Truncate the index file first, the tail position is also considered // when calculating the new freezer table length. length := items - t.itemOffset.Load() - if err := truncateFreezerFile(t.index, int64(length+1)*indexEntrySize); err != nil { + newOffset := (length + 1) * indexEntrySize + if err := truncateFreezerFile(t.index, int64(newOffset)); err != nil { return err } if err := t.index.Sync(); err != nil { return err } + // If the index file is truncated beyond the flush offset, move the flush + // offset back to the new end of the file. A crash may occur before the + // offset is updated, leaving a dangling reference that points to a position + // outside the file. If so, the offset will be reset to the new end of the + // file during the next run. + if t.metadata.flushOffset > newOffset { + if err := t.metadata.setFlushOffset(newOffset, true); err != nil { + return err + } + } // Calculate the new expected size of the data file and truncate it var expected indexEntry if length == 0 { @@ -652,7 +725,10 @@ func (t *freezerTable) truncateTail(items uint64) error { } // Update the virtual tail marker and hidden these entries in table. t.itemHidden.Store(items) - if err := writeMetadata(t.meta, newMetadata(items)); err != nil { + + // Update the virtual tail without fsync, otherwise it will significantly + // impact the overall performance. + if err := t.metadata.setVirtualTail(items, false); err != nil { return err } // Hidden items still fall in the current tail file, no data file @@ -664,6 +740,18 @@ func (t *freezerTable) truncateTail(items uint64) error { if t.tailId > newTailId { return fmt.Errorf("invalid index, tail-file %d, item-file %d", t.tailId, newTailId) } + // Sync the table before performing the index tail truncation. A crash may + // occur after truncating the index file without updating the flush offset, + // leaving a dangling offset that points to a position outside the file. + // The offset will be rewound to the end of file during the next run + // automatically and implicitly assumes all the items within the file are + // complete. + // + // Therefore, forcibly flush everything above the offset to ensure this + // assumption is satisfied! + if err := t.syncWithNoLock(); err != nil { + return err + } // Count how many items can be deleted from the file. var ( newDeleted = items @@ -681,11 +769,6 @@ func (t *freezerTable) truncateTail(items uint64) error { } newDeleted = current } - // Commit the changes of metadata file first before manipulating - // the indexes file. - if err := t.meta.Sync(); err != nil { - return err - } // Close the index file before shorten it. if err := t.index.Close(); err != nil { return err @@ -716,6 +799,21 @@ func (t *freezerTable) truncateTail(items uint64) error { t.itemOffset.Store(newDeleted) t.releaseFilesBefore(t.tailId, true) + // Move the index flush offset backward due to the deletion of an index segment. + // A crash may occur before the offset is updated, leaving a dangling reference + // that points to a position outside the file. If so, the offset will be reset + // to the new end of the file during the next run. + // + // Note, both the index and head data file has been persisted before performing + // tail truncation and all the items in these files are regarded as complete. + shorten := indexEntrySize * (newDeleted - deleted) + if t.metadata.flushOffset <= shorten { + return fmt.Errorf("invalid index flush offset: %d, shorten: %d", t.metadata.flushOffset, shorten) + } else { + if err := t.metadata.setFlushOffset(t.metadata.flushOffset-shorten, true); err != nil { + return err + } + } // Retrieve the new size and update the total size counter newSize, err := t.sizeNolock() if err != nil { @@ -725,40 +823,30 @@ func (t *freezerTable) truncateTail(items uint64) error { return nil } -// Close closes all opened files. +// Close closes all opened files and finalizes the freezer table for use. +// This operation must be completed before shutdown to prevent the loss of +// recent writes. func (t *freezerTable) Close() error { t.lock.Lock() defer t.lock.Unlock() + if err := t.syncWithNoLock(); err != nil { + return err + } var errs []error - doClose := func(f *os.File, sync bool, close bool) { - if sync && !t.readonly { - if err := f.Sync(); err != nil { - errs = append(errs, err) - } - } - if close { - if err := f.Close(); err != nil { - errs = append(errs, err) - } + doClose := func(f *os.File) { + if err := f.Close(); err != nil { + errs = append(errs, err) } } - // Trying to fsync a file opened in rdonly causes "Access denied" - // error on Windows. - doClose(t.index, true, true) - doClose(t.meta, true, true) - - // The preopened non-head data-files are all opened in readonly. - // The head is opened in rw-mode, so we sync it here - but since it's also - // part of t.files, it will be closed in the loop below. - doClose(t.head, true, false) // sync but do not close - + doClose(t.index) + doClose(t.metadata.file) for _, f := range t.files { - doClose(f, false, true) // close but do not sync + doClose(f) } t.index = nil - t.meta = nil t.head = nil + t.metadata.file = nil if errs != nil { return fmt.Errorf("%v", errs) @@ -917,7 +1005,7 @@ func (t *freezerTable) retrieveItems(start, count, maxBytes uint64) ([]byte, []i defer t.lock.RUnlock() // Ensure the table and the item are accessible - if t.index == nil || t.head == nil || t.meta == nil { + if t.index == nil || t.head == nil || t.metadata.file == nil { return nil, nil, errClosed } var ( @@ -1042,6 +1130,9 @@ func (t *freezerTable) advanceHead() error { t.lock.Lock() defer t.lock.Unlock() + if err := t.syncWithNoLock(); err != nil { + return err + } // We open the next file in truncated mode -- if this file already // exists, we need to start over from scratch on it. nextID := t.headId + 1 @@ -1069,7 +1160,19 @@ func (t *freezerTable) advanceHead() error { func (t *freezerTable) Sync() error { t.lock.Lock() defer t.lock.Unlock() - if t.index == nil || t.head == nil || t.meta == nil { + + return t.syncWithNoLock() +} + +// syncWithNoLock is the internal version of Sync which assumes the lock is +// already held. +func (t *freezerTable) syncWithNoLock() error { + // Trying to fsync a file opened in rdonly causes "Access denied" + // error on Windows. + if t.readonly { + return nil + } + if t.index == nil || t.head == nil || t.metadata.file == nil { return errClosed } var err error @@ -1078,10 +1181,18 @@ func (t *freezerTable) Sync() error { err = e } } - trackError(t.index.Sync()) - trackError(t.meta.Sync()) trackError(t.head.Sync()) + + // A crash may occur before the offset is updated, leaving the offset + // points to a old position. If so, the extra items above the offset + // will be truncated during the next run. + stat, err := t.index.Stat() + if err != nil { + return err + } + offset := stat.Size() + trackError(t.metadata.setFlushOffset(uint64(offset), true)) return err } @@ -1097,13 +1208,8 @@ func (t *freezerTable) dumpIndexString(start, stop int64) string { } func (t *freezerTable) dumpIndex(w io.Writer, start, stop int64) { - meta, err := readMetadata(t.meta) - if err != nil { - fmt.Fprintf(w, "Failed to decode freezer table %v\n", err) - return - } - fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", meta.Version, - t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load()) + fmt.Fprintf(w, "Version %d count %d, deleted %d, hidden %d\n", + t.metadata.version, t.items.Load(), t.itemOffset.Load(), t.itemHidden.Load()) buf := make([]byte, indexEntrySize) diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index fd6e3cf1992c..4bbb3aaf705e 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -262,28 +262,28 @@ func TestSnappyDetection(t *testing.T) { f.Close() } - // Open without snappy + // Open with snappy { - f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) if err != nil { t.Fatal(err) } - if _, err = f.Retrieve(0); err == nil { + // There should be 255 items + if _, err = f.Retrieve(0xfe); err != nil { f.Close() - t.Fatalf("expected empty table") + t.Fatalf("expected no error, got %v", err) } } - // Open with snappy + // Open without snappy { - f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, true, false) + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 50, false, false) if err != nil { t.Fatal(err) } - // There should be 255 items - if _, err = f.Retrieve(0xfe); err != nil { + if _, err = f.Retrieve(0); err == nil { f.Close() - t.Fatalf("expected no error, got %v", err) + t.Fatalf("expected empty table") } } } @@ -521,93 +521,53 @@ func TestFreezerOffset(t *testing.T) { fname := fmt.Sprintf("offset-%d", rand.Uint64()) // Fill table - { - f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) - if err != nil { - t.Fatal(err) - } - - // Write 6 x 20 bytes, splitting out into three files - batch := f.newBatch() - require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) - require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) - - require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) - require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) - - require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) - require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) - require.NoError(t, batch.commit()) - - t.Log(f.dumpIndexString(0, 100)) - f.Close() + f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) } - // Now crop it. - { - // delete files 0 and 1 - for i := 0; i < 2; i++ { - p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.%04d.rdat", fname, i)) - if err := os.Remove(p); err != nil { - t.Fatal(err) - } - } - // Read the index file - p := filepath.Join(os.TempDir(), fmt.Sprintf("%v.ridx", fname)) - indexFile, err := os.OpenFile(p, os.O_RDWR, 0644) - if err != nil { - t.Fatal(err) - } - indexBuf := make([]byte, 7*indexEntrySize) - indexFile.Read(indexBuf) - - // Update the index file, so that we store - // [ file = 2, offset = 4 ] at index zero + // Write 6 x 20 bytes, splitting out into three files + batch := f.newBatch() + require.NoError(t, batch.AppendRaw(0, getChunk(20, 0xFF))) + require.NoError(t, batch.AppendRaw(1, getChunk(20, 0xEE))) - zeroIndex := indexEntry{ - filenum: uint32(2), // First file is 2 - offset: uint32(4), // We have removed four items - } - buf := zeroIndex.append(nil) + require.NoError(t, batch.AppendRaw(2, getChunk(20, 0xdd))) + require.NoError(t, batch.AppendRaw(3, getChunk(20, 0xcc))) - // Overwrite index zero - copy(indexBuf, buf) + require.NoError(t, batch.AppendRaw(4, getChunk(20, 0xbb))) + require.NoError(t, batch.AppendRaw(5, getChunk(20, 0xaa))) + require.NoError(t, batch.commit()) - // Remove the four next indices by overwriting - copy(indexBuf[indexEntrySize:], indexBuf[indexEntrySize*5:]) - indexFile.WriteAt(indexBuf, 0) + t.Log(f.dumpIndexString(0, 100)) - // Need to truncate the moved index items - indexFile.Truncate(indexEntrySize * (1 + 2)) - indexFile.Close() - } + // Now crop it. + f.truncateTail(4) + f.Close() // Now open again - { - f, err := newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) - if err != nil { - t.Fatal(err) - } - defer f.Close() - t.Log(f.dumpIndexString(0, 100)) + f, err = newTable(os.TempDir(), fname, rm, wm, sg, 40, true, false) + if err != nil { + t.Fatal(err) + } + t.Log(f.dumpIndexString(0, 100)) - // It should allow writing item 6. - batch := f.newBatch() - require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) - require.NoError(t, batch.commit()) + // It should allow writing item 6. + batch = f.newBatch() + require.NoError(t, batch.AppendRaw(6, getChunk(20, 0x99))) + require.NoError(t, batch.commit()) - checkRetrieveError(t, f, map[uint64]error{ - 0: errOutOfBounds, - 1: errOutOfBounds, - 2: errOutOfBounds, - 3: errOutOfBounds, - }) - checkRetrieve(t, f, map[uint64][]byte{ - 4: getChunk(20, 0xbb), - 5: getChunk(20, 0xaa), - 6: getChunk(20, 0x99), - }) - } + checkRetrieveError(t, f, map[uint64]error{ + 0: errOutOfBounds, + 1: errOutOfBounds, + 2: errOutOfBounds, + 3: errOutOfBounds, + }) + checkRetrieve(t, f, map[uint64][]byte{ + 4: getChunk(20, 0xbb), + 5: getChunk(20, 0xaa), + 6: getChunk(20, 0x99), + }) + f.Close() // Edit the index again, with a much larger initial offset of 1M. { @@ -1369,45 +1329,63 @@ func TestRandom(t *testing.T) { } func TestIndexValidation(t *testing.T) { - const ( - items = 30 - dataSize = 10 - ) + const dataSize = 10 + garbage := indexEntry{ filenum: 100, offset: 200, } var cases = []struct { - offset int64 - data []byte - expItems int + write int + offset int64 + data []byte + expItems int + hasCorruption bool }{ // extend index file with zero bytes at the end { - offset: (items + 1) * indexEntrySize, + write: 5, + offset: (5 + 1) * indexEntrySize, data: make([]byte, indexEntrySize), - expItems: 30, + expItems: 5, + }, + // extend index file with unaligned zero bytes at the end + { + write: 5, + offset: (5 + 1) * indexEntrySize, + data: make([]byte, indexEntrySize*1.5), + expItems: 5, }, // write garbage in the first non-head item { + write: 5, offset: indexEntrySize, data: garbage.append(nil), expItems: 0, }, - // write garbage in the first non-head item + // write garbage in the middle + { + write: 5, + offset: 3 * indexEntrySize, + data: garbage.append(nil), + expItems: 2, + }, + // fulfill the first data file (but not yet advanced), the zero bytes + // at tail should be truncated. { - offset: (items/2 + 1) * indexEntrySize, + write: 10, + offset: 11 * indexEntrySize, data: garbage.append(nil), - expItems: items / 2, + expItems: 10, }, } for _, c := range cases { fn := fmt.Sprintf("t-%d", rand.Uint64()) - f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false) + f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 10*dataSize, true, false) if err != nil { t.Fatal(err) } - writeChunks(t, f, items, dataSize) + writeChunks(t, f, c.write, dataSize) // write corrupted data f.index.WriteAt(c.data, c.offset) @@ -1421,10 +1399,10 @@ func TestIndexValidation(t *testing.T) { for i := 0; i < c.expItems; i++ { exp := getChunk(10, i) got, err := f.Retrieve(uint64(i)) - if err != nil { + if err != nil && !c.hasCorruption { t.Fatalf("Failed to read from table, %v", err) } - if !bytes.Equal(exp, got) { + if !bytes.Equal(exp, got) && !c.hasCorruption { t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got) } } @@ -1433,3 +1411,163 @@ func TestIndexValidation(t *testing.T) { } } } + +// TestFlushOffsetTracking tests the flush offset tracking. The offset moving +// in the test is mostly triggered by the advanceHead (new data file) and +// heda/tail truncation. +func TestFlushOffsetTracking(t *testing.T) { + const ( + items = 35 + dataSize = 10 + fileSize = 100 + ) + fn := fmt.Sprintf("t-%d", rand.Uint64()) + f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false) + if err != nil { + t.Fatal(err) + } + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full) + writeChunks(t, f, items, dataSize) + + var cases = []struct { + op func(*freezerTable) + offset uint64 + }{ + { + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full) + func(f *freezerTable) {}, // no-op + 31 * indexEntrySize, + }, + { + // Write more items to fulfill the newest data file, but the file advance + // is not triggered. + + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items, full) + func(f *freezerTable) { + batch := f.newBatch() + for i := 0; i < 5; i++ { + batch.AppendRaw(items+uint64(i), make([]byte, dataSize)) + } + batch.commit() + }, + 31 * indexEntrySize, + }, + { + // Write more items to trigger the data file advance + + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(1 item) + func(f *freezerTable) { + batch := f.newBatch() + batch.AppendRaw(items+5, make([]byte, dataSize)) + batch.commit() + }, + 41 * indexEntrySize, + }, + { + // Head truncate + + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item) + func(f *freezerTable) { + f.truncateHead(items + 5) + }, + 41 * indexEntrySize, + }, + { + // Tail truncate + + // Data files: + // F1(1 hidden, 9 visible) -> F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item) + func(f *freezerTable) { + f.truncateTail(1) + }, + 41 * indexEntrySize, + }, + { + // Tail truncate + + // Data files: + // F2(10 items) -> F3(10 items) -> F4(10 items) -> F5(0 item) + func(f *freezerTable) { + f.truncateTail(10) + }, + 31 * indexEntrySize, + }, + { + // Tail truncate + + // Data files: + // F4(10 items) -> F5(0 item) + func(f *freezerTable) { + f.truncateTail(30) + }, + 11 * indexEntrySize, + }, + { + // Head truncate + + // Data files: + // F4(9 items) + func(f *freezerTable) { + f.truncateHead(items + 4) + }, + 10 * indexEntrySize, + }, + } + for _, c := range cases { + c.op(f) + if f.metadata.flushOffset != c.offset { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", c.offset, f.metadata.flushOffset) + } + } +} + +func TestTailTruncationCrash(t *testing.T) { + const ( + items = 35 + dataSize = 10 + fileSize = 100 + ) + fn := fmt.Sprintf("t-%d", rand.Uint64()) + f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false) + if err != nil { + t.Fatal(err) + } + // Data files: + // F1(10 items) -> F2(10 items) -> F3(10 items) -> F4(5 items, non-full) + writeChunks(t, f, items, dataSize) + + // The latest 5 items are not persisted yet + if f.metadata.flushOffset != 31*indexEntrySize { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 31*indexEntrySize, f.metadata.flushOffset) + } + + f.truncateTail(5) + if f.metadata.flushOffset != 31*indexEntrySize { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 31*indexEntrySize, f.metadata.flushOffset) + } + + // Truncate the first 10 items which results in the first data file + // being removed. The offset should be moved to 26*indexEntrySize. + f.truncateTail(10) + if f.metadata.flushOffset != 26*indexEntrySize { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 26*indexEntrySize, f.metadata.flushOffset) + } + + // Write the offset back to 31*indexEntrySize to simulate a crash + // which occurs after truncating the index file without updating + // the offset + f.metadata.setFlushOffset(31*indexEntrySize, true) + + f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), fileSize, true, false) + if err != nil { + t.Fatal(err) + } + if f.metadata.flushOffset != 26*indexEntrySize { + t.Fatalf("Unexpected index flush offset, want: %d, got: %d", 26*indexEntrySize, f.metadata.flushOffset) + } +} From a33bba477349f8f54abd42c3045a3ea1ab8579e7 Mon Sep 17 00:00:00 2001 From: Gary Rong Date: Thu, 23 Jan 2025 11:21:42 +0800 Subject: [PATCH 2/5] core/rawdb: several fixes --- core/rawdb/freezer_batch.go | 2 +- core/rawdb/freezer_meta.go | 7 ++++--- core/rawdb/freezer_table.go | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/rawdb/freezer_batch.go b/core/rawdb/freezer_batch.go index 0b354cb6be2a..801d30f73f70 100644 --- a/core/rawdb/freezer_batch.go +++ b/core/rawdb/freezer_batch.go @@ -210,7 +210,7 @@ func (batch *freezerTableBatch) commit() error { // Periodically sync the table, todo (rjl493456442) make it configurable? if time.Since(batch.t.lastSync) > 30*time.Second { batch.t.lastSync = time.Now() - return batch.t.syncWithNoLock() + return batch.t.Sync() } return nil } diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go index f60fce79ac62..3b60847d3cdf 100644 --- a/core/rawdb/freezer_meta.go +++ b/core/rawdb/freezer_meta.go @@ -25,8 +25,9 @@ import ( ) const ( - freezerTableV1 = 1 // Initial version of metadata struct - freezerTableV2 = 2 // Add field: 'flushOffset' + freezerTableV1 = 1 // Initial version of metadata struct + freezerTableV2 = 2 // Add field: 'flushOffset' + freezerVersion = freezerTableV2 // The current used version ) // freezerTableMeta is a collection of additional properties that describe the @@ -164,7 +165,7 @@ func (m *freezerTableMeta) write(sync bool) error { Offset uint64 } var o obj - o.Version = freezerTableV2 // forcibly set it to v2 + o.Version = freezerVersion // forcibly use the current version o.Tail = m.virtualTail o.Offset = m.flushOffset diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index f55c117074fc..6bf4eb6c63ba 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -320,7 +320,7 @@ func (t *freezerTable) repair() error { // offset is updated, leaving a dangling reference that points to a position // outside the file. If so, the offset will be reset to the new end of the // file during the next run. - if t.metadata.flushOffset < uint64(newOffset) { + if t.metadata.flushOffset > uint64(newOffset) { if err := t.metadata.setFlushOffset(uint64(newOffset), true); err != nil { return err } From 791988fd322500f2ea70bd949f870e8effe18d7e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Mon, 3 Feb 2025 23:54:51 +0100 Subject: [PATCH 3/5] core/rawdb: clean up freezer index truncation logic Couple modifications here: - Use switch instead of a condition chain. It seems better to me since the idea is comparing the file size with flushOffset and performing different actions when it's larger, smaller, equal. - Rephrase some messages to add context. Users should be able to infer it's about the freezer from the message. - Rename 'offset' to 'size'. --- core/rawdb/freezer_table.go | 65 ++++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 30 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 6bf4eb6c63ba..114848f7b627 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -393,52 +393,57 @@ func (t *freezerTable) repairIndex() error { if err != nil { return err } - offset := stat.Size() + size := stat.Size() // Validate the items in the index file to ensure the data integrity. // It's possible some garbage data is retained in the index file after // the power failures and should be truncated first. - offset, err = t.checkIndex(offset) + size, err = t.checkIndex(size) if err != nil { return err } // If legacy metadata is detected, attempt to recover the offset from the // index file to avoid clearing the entire table. if t.metadata.version == freezerTableV1 { - t.logger.Info("Recover the flush offset for legacy table", "offset", offset) - return t.metadata.setFlushOffset(uint64(offset), true) - } - // Move the flush offset to the end of the file for fresh new freezer table - if offset == indexEntrySize && t.metadata.flushOffset == 0 { - return t.metadata.setFlushOffset(uint64(offset), true) + t.logger.Info("Recovering freezer flushOffset for legacy table", "offset", size) + return t.metadata.setFlushOffset(uint64(size), true) } - // Short circuit if the offset is aligned with the index file - if offset == int64(t.metadata.flushOffset) { + + switch { + case size == indexEntrySize && t.metadata.flushOffset == 0: + // It's a new freezer table with no content. + // Move the flush offset to the end of the file. + return t.metadata.setFlushOffset(uint64(size), true) + + case size == int64(t.metadata.flushOffset): + // flushOffset is aligned with the index file, all is well. return nil - } - // Extra index items have been detected beyond the flush offset. Since these - // entries correspond to data that has not been fully flushed to disk in the - // last run (because of unclean shutdown), their integrity cannot be guaranteed. - // To ensure consistency, these index items will be truncated, as there is no - // reliable way to validate or recover their associated data. - if offset > int64(t.metadata.flushOffset) { - size := offset - int64(t.metadata.flushOffset) + + case size > int64(t.metadata.flushOffset): + // Extra index items have been detected beyond the flush offset. Since these + // entries correspond to data that has not been fully flushed to disk in the + // last run (because of unclean shutdown), their integrity cannot be guaranteed. + // To ensure consistency, these index items will be truncated, as there is no + // reliable way to validate or recover their associated data. + extraSize := size - int64(t.metadata.flushOffset) if t.readonly { - return fmt.Errorf("index file(path: %s, name: %s) contains garbage data %d", t.path, t.name, size) + return fmt.Errorf("index file(path: %s, name: %s) contains %d garbage data bytes", t.path, t.name, extraSize) } - t.logger.Info("Truncate excessive items", "size", size) + t.logger.Warn("Truncating freezer items after flushOffset", "size", extraSize) return truncateFreezerFile(t.index, int64(t.metadata.flushOffset)) + + default: // size < flushOffset + // Flush offset refers to a position larger than index file. The only + // possible scenario for this is: a power failure or system crash has occurred after + // truncating the segment in index file from head or tail, but without updating + // the flush offset. In this case, automatically reset the flush offset with + // the file size which implies the entire index file is complete. + if t.readonly { + return nil // do nothing in read only mode + } + t.logger.Warn("Rewinding freezer flushOffset", "old", t.metadata.flushOffset, "new", size) + return t.metadata.setFlushOffset(uint64(size), true) } - // Flush offset refers to the position which exceeds the index file. The only - // possible scenario for this is: power failure or system crash occurs after - // truncating the segment in index file from head or tail, but without updating - // the flush offset. In this case, automatically reset the flush offset with - // the file size which implies the entire index file is complete. - if t.readonly { - return nil // do nothing in read only mode - } - t.logger.Info("Rewind the flush offset", "old", t.metadata.flushOffset, "new", offset) - return t.metadata.setFlushOffset(uint64(offset), true) } // checkIndex validates the integrity of the index file. According to the design, From eb5a4493165c339b3c1fc7d6a94ed3e9193230ad Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 4 Feb 2025 00:08:03 +0100 Subject: [PATCH 4/5] core/rawdb: change freezerTableMeta.flushOffset to int64 The flushOffset is a file position, which is usually tracked as int64 in Go. It's better to keep it as the same type to avoid conversion. Overall, this change removes some conversions, and introduces some. The new conversions are in cases where flushOffset is compared with numEntries*indexEntrySize computations. --- core/rawdb/freezer_meta.go | 14 ++++++++++---- core/rawdb/freezer_meta_test.go | 4 ++-- core/rawdb/freezer_table.go | 26 +++++++++++++------------- core/rawdb/freezer_table_test.go | 2 +- 4 files changed, 26 insertions(+), 20 deletions(-) diff --git a/core/rawdb/freezer_meta.go b/core/rawdb/freezer_meta.go index 3b60847d3cdf..3cda9ae45c7e 100644 --- a/core/rawdb/freezer_meta.go +++ b/core/rawdb/freezer_meta.go @@ -19,8 +19,10 @@ package rawdb import ( "errors" "io" + "math" "os" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" ) @@ -62,7 +64,7 @@ type freezerTableMeta struct { // // The offset could be moved forward by applying sync operation, or be moved // backward in cases of head/tail truncation, etc. - flushOffset uint64 + flushOffset int64 } // decodeV1 attempts to decode the metadata structure in v1 format. If fails or @@ -109,11 +111,15 @@ func decodeV2(file *os.File) *freezerTableMeta { if o.Version != freezerTableV2 { return nil } + if o.Offset > math.MaxInt64 { + log.Error("Invalid flushOffset %d in freezer metadata", o.Offset, "file", file.Name()) + return nil + } return &freezerTableMeta{ file: file, version: freezerTableV2, virtualTail: o.Tail, - flushOffset: o.Offset, + flushOffset: int64(o.Offset), } } @@ -152,7 +158,7 @@ func (m *freezerTableMeta) setVirtualTail(tail uint64, sync bool) error { } // setFlushOffset sets the flush offset and flushes the metadata if sync is true. -func (m *freezerTableMeta) setFlushOffset(offset uint64, sync bool) error { +func (m *freezerTableMeta) setFlushOffset(offset int64, sync bool) error { m.flushOffset = offset return m.write(sync) } @@ -167,7 +173,7 @@ func (m *freezerTableMeta) write(sync bool) error { var o obj o.Version = freezerVersion // forcibly use the current version o.Tail = m.virtualTail - o.Offset = m.flushOffset + o.Offset = uint64(m.flushOffset) _, err := m.file.Seek(0, io.SeekStart) if err != nil { diff --git a/core/rawdb/freezer_meta_test.go b/core/rawdb/freezer_meta_test.go index b02c23090190..31f8c519c806 100644 --- a/core/rawdb/freezer_meta_test.go +++ b/core/rawdb/freezer_meta_test.go @@ -79,7 +79,7 @@ func TestUpgradeMetadata(t *testing.T) { if meta.virtualTail != uint64(100) { t.Fatal("Unexpected virtual tail field") } - if meta.flushOffset != uint64(0) { + if meta.flushOffset != 0 { t.Fatal("Unexpected flush offset field") } @@ -95,7 +95,7 @@ func TestUpgradeMetadata(t *testing.T) { if meta.virtualTail != uint64(100) { t.Fatal("Unexpected virtual tail field") } - if meta.flushOffset != uint64(100) { + if meta.flushOffset != 100 { t.Fatal("Unexpected flush offset field") } } diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 114848f7b627..88351a78191c 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -320,8 +320,8 @@ func (t *freezerTable) repair() error { // offset is updated, leaving a dangling reference that points to a position // outside the file. If so, the offset will be reset to the new end of the // file during the next run. - if t.metadata.flushOffset > uint64(newOffset) { - if err := t.metadata.setFlushOffset(uint64(newOffset), true); err != nil { + if t.metadata.flushOffset > newOffset { + if err := t.metadata.setFlushOffset(newOffset, true); err != nil { return err } } @@ -406,31 +406,31 @@ func (t *freezerTable) repairIndex() error { // index file to avoid clearing the entire table. if t.metadata.version == freezerTableV1 { t.logger.Info("Recovering freezer flushOffset for legacy table", "offset", size) - return t.metadata.setFlushOffset(uint64(size), true) + return t.metadata.setFlushOffset(size, true) } switch { case size == indexEntrySize && t.metadata.flushOffset == 0: // It's a new freezer table with no content. // Move the flush offset to the end of the file. - return t.metadata.setFlushOffset(uint64(size), true) + return t.metadata.setFlushOffset(size, true) - case size == int64(t.metadata.flushOffset): + case size == t.metadata.flushOffset: // flushOffset is aligned with the index file, all is well. return nil - case size > int64(t.metadata.flushOffset): + case size > t.metadata.flushOffset: // Extra index items have been detected beyond the flush offset. Since these // entries correspond to data that has not been fully flushed to disk in the // last run (because of unclean shutdown), their integrity cannot be guaranteed. // To ensure consistency, these index items will be truncated, as there is no // reliable way to validate or recover their associated data. - extraSize := size - int64(t.metadata.flushOffset) + extraSize := size - t.metadata.flushOffset if t.readonly { return fmt.Errorf("index file(path: %s, name: %s) contains %d garbage data bytes", t.path, t.name, extraSize) } t.logger.Warn("Truncating freezer items after flushOffset", "size", extraSize) - return truncateFreezerFile(t.index, int64(t.metadata.flushOffset)) + return truncateFreezerFile(t.index, t.metadata.flushOffset) default: // size < flushOffset // Flush offset refers to a position larger than index file. The only @@ -442,7 +442,7 @@ func (t *freezerTable) repairIndex() error { return nil // do nothing in read only mode } t.logger.Warn("Rewinding freezer flushOffset", "old", t.metadata.flushOffset, "new", size) - return t.metadata.setFlushOffset(uint64(size), true) + return t.metadata.setFlushOffset(size, true) } } @@ -629,8 +629,8 @@ func (t *freezerTable) truncateHead(items uint64) error { // offset is updated, leaving a dangling reference that points to a position // outside the file. If so, the offset will be reset to the new end of the // file during the next run. - if t.metadata.flushOffset > newOffset { - if err := t.metadata.setFlushOffset(newOffset, true); err != nil { + if t.metadata.flushOffset > int64(newOffset) { + if err := t.metadata.setFlushOffset(int64(newOffset), true); err != nil { return err } } @@ -811,7 +811,7 @@ func (t *freezerTable) truncateTail(items uint64) error { // // Note, both the index and head data file has been persisted before performing // tail truncation and all the items in these files are regarded as complete. - shorten := indexEntrySize * (newDeleted - deleted) + shorten := indexEntrySize * int64(newDeleted-deleted) if t.metadata.flushOffset <= shorten { return fmt.Errorf("invalid index flush offset: %d, shorten: %d", t.metadata.flushOffset, shorten) } else { @@ -1197,7 +1197,7 @@ func (t *freezerTable) syncWithNoLock() error { return err } offset := stat.Size() - trackError(t.metadata.setFlushOffset(uint64(offset), true)) + trackError(t.metadata.setFlushOffset(offset, true)) return err } diff --git a/core/rawdb/freezer_table_test.go b/core/rawdb/freezer_table_test.go index 4bbb3aaf705e..9a72af6ccc81 100644 --- a/core/rawdb/freezer_table_test.go +++ b/core/rawdb/freezer_table_test.go @@ -1432,7 +1432,7 @@ func TestFlushOffsetTracking(t *testing.T) { var cases = []struct { op func(*freezerTable) - offset uint64 + offset int64 }{ { // Data files: From 6b917b393d0b5c7f74a66b24c134f3b520b4851d Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 4 Feb 2025 00:21:48 +0100 Subject: [PATCH 5/5] core/rawdb: rename syncWithNoLock to doSync --- core/rawdb/freezer_table.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/core/rawdb/freezer_table.go b/core/rawdb/freezer_table.go index 88351a78191c..1ba8cf639fe6 100644 --- a/core/rawdb/freezer_table.go +++ b/core/rawdb/freezer_table.go @@ -754,7 +754,7 @@ func (t *freezerTable) truncateTail(items uint64) error { // // Therefore, forcibly flush everything above the offset to ensure this // assumption is satisfied! - if err := t.syncWithNoLock(); err != nil { + if err := t.doSync(); err != nil { return err } // Count how many items can be deleted from the file. @@ -835,7 +835,7 @@ func (t *freezerTable) Close() error { t.lock.Lock() defer t.lock.Unlock() - if err := t.syncWithNoLock(); err != nil { + if err := t.doSync(); err != nil { return err } var errs []error @@ -1135,7 +1135,7 @@ func (t *freezerTable) advanceHead() error { t.lock.Lock() defer t.lock.Unlock() - if err := t.syncWithNoLock(); err != nil { + if err := t.doSync(); err != nil { return err } // We open the next file in truncated mode -- if this file already @@ -1166,12 +1166,11 @@ func (t *freezerTable) Sync() error { t.lock.Lock() defer t.lock.Unlock() - return t.syncWithNoLock() + return t.doSync() } -// syncWithNoLock is the internal version of Sync which assumes the lock is -// already held. -func (t *freezerTable) syncWithNoLock() error { +// doSync is the internal version of Sync which assumes the lock is already held. +func (t *freezerTable) doSync() error { // Trying to fsync a file opened in rdonly causes "Access denied" // error on Windows. if t.readonly {