-
Notifications
You must be signed in to change notification settings - Fork 21.6k
core: write chain data in atomic way #20287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -406,6 +406,11 @@ func (bc *BlockChain) SetHead(head uint64) error { | |
| } | ||
| } | ||
| rawdb.WriteHeadBlockHash(db, newHeadBlock.Hash()) | ||
|
|
||
| // Degrade the chain markers if they are explicitly reverted. | ||
| // In theory we should update all in-memory markers in the | ||
| // last step, however the direction of SetHead is from high | ||
| // to low, so it's safe the update in-memory markers directly. | ||
| bc.currentBlock.Store(newHeadBlock) | ||
| headBlockGauge.Update(int64(newHeadBlock.NumberU64())) | ||
| } | ||
|
|
@@ -418,6 +423,11 @@ func (bc *BlockChain) SetHead(head uint64) error { | |
| newHeadFastBlock = bc.genesisBlock | ||
| } | ||
| rawdb.WriteHeadFastBlockHash(db, newHeadFastBlock.Hash()) | ||
|
|
||
| // Degrade the chain markers if they are explicitly reverted. | ||
| // In theory we should update all in-memory markers in the | ||
| // last step, however the direction of SetHead is from high | ||
| // to low, so it's safe the update in-memory markers directly. | ||
| bc.currentFastBlock.Store(newHeadFastBlock) | ||
| headFastBlockGauge.Update(int64(newHeadFastBlock.NumberU64())) | ||
| } | ||
|
|
@@ -537,21 +547,22 @@ func (bc *BlockChain) ResetWithGenesisBlock(genesis *types.Block) error { | |
| defer bc.chainmu.Unlock() | ||
|
|
||
| // Prepare the genesis block and reinitialise the chain | ||
| if err := bc.hc.WriteTd(genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()); err != nil { | ||
| log.Crit("Failed to write genesis block TD", "err", err) | ||
| batch := bc.db.NewBatch() | ||
| rawdb.WriteTd(batch, genesis.Hash(), genesis.NumberU64(), genesis.Difficulty()) | ||
| rawdb.WriteBlock(batch, genesis) | ||
| if err := batch.Write(); err != nil { | ||
| log.Crit("Failed to write genesis block", "err", err) | ||
| } | ||
| rawdb.WriteBlock(bc.db, genesis) | ||
| bc.writeHeadBlock(genesis) | ||
|
|
||
| // Last update all in-memory chain markers | ||
| bc.genesisBlock = genesis | ||
| bc.insert(bc.genesisBlock) | ||
| bc.currentBlock.Store(bc.genesisBlock) | ||
| headBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) | ||
|
|
||
| bc.hc.SetGenesis(bc.genesisBlock.Header()) | ||
| bc.hc.SetCurrentHeader(bc.genesisBlock.Header()) | ||
| bc.currentFastBlock.Store(bc.genesisBlock) | ||
| headFastBlockGauge.Update(int64(bc.genesisBlock.NumberU64())) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -609,31 +620,39 @@ func (bc *BlockChain) ExportN(w io.Writer, first uint64, last uint64) error { | |
| return nil | ||
| } | ||
|
|
||
| // insert injects a new head block into the current block chain. This method | ||
| // writeHeadBlock injects a new head block into the current block chain. This method | ||
| // assumes that the block is indeed a true head. It will also reset the head | ||
| // header and the head fast sync block to this very same block if they are older | ||
| // or if they are on a different side chain. | ||
| // | ||
| // Note, this function assumes that the `mu` mutex is held! | ||
| func (bc *BlockChain) insert(block *types.Block) { | ||
| func (bc *BlockChain) writeHeadBlock(block *types.Block) { | ||
| // If the block is on a side chain or an unknown one, force other heads onto it too | ||
| updateHeads := rawdb.ReadCanonicalHash(bc.db, block.NumberU64()) != block.Hash() | ||
|
|
||
| // Add the block to the canonical chain number scheme and mark as the head | ||
| rawdb.WriteCanonicalHash(bc.db, block.Hash(), block.NumberU64()) | ||
| rawdb.WriteHeadBlockHash(bc.db, block.Hash()) | ||
|
|
||
| bc.currentBlock.Store(block) | ||
| headBlockGauge.Update(int64(block.NumberU64())) | ||
| batch := bc.db.NewBatch() | ||
| rawdb.WriteCanonicalHash(batch, block.Hash(), block.NumberU64()) | ||
| rawdb.WriteTxLookupEntries(batch, block) | ||
| rawdb.WriteHeadBlockHash(batch, block.Hash()) | ||
|
|
||
| // If the block is better than our head or is on a different chain, force update heads | ||
| if updateHeads { | ||
| rawdb.WriteHeadHeaderHash(batch, block.Hash()) | ||
| rawdb.WriteHeadFastBlockHash(batch, block.Hash()) | ||
|
||
| } | ||
| // Flush the whole batch into the disk, exit the node if failed | ||
| if err := batch.Write(); err != nil { | ||
| log.Crit("Failed to update chain indexes and markers", "err", err) | ||
| } | ||
|
||
| // Update all in-memory chain markers in the last step | ||
| if updateHeads { | ||
| bc.hc.SetCurrentHeader(block.Header()) | ||
| rawdb.WriteHeadFastBlockHash(bc.db, block.Hash()) | ||
|
|
||
| bc.currentFastBlock.Store(block) | ||
| headFastBlockGauge.Update(int64(block.NumberU64())) | ||
| } | ||
| bc.currentBlock.Store(block) | ||
| headBlockGauge.Update(int64(block.NumberU64())) | ||
| } | ||
|
|
||
| // Genesis retrieves the chain's genesis block. | ||
|
|
@@ -879,26 +898,36 @@ func (bc *BlockChain) Rollback(chain []common.Hash) { | |
| bc.chainmu.Lock() | ||
| defer bc.chainmu.Unlock() | ||
|
|
||
| batch := bc.db.NewBatch() | ||
| for i := len(chain) - 1; i >= 0; i-- { | ||
| hash := chain[i] | ||
|
|
||
| // Degrade the chain markers if they are explicitly reverted. | ||
| // In theory we should update all in-memory markers in the | ||
| // last step, however the direction of rollback is from high | ||
| // to low, so it's safe the update in-memory markers directly. | ||
| currentHeader := bc.hc.CurrentHeader() | ||
| if currentHeader.Hash() == hash { | ||
| bc.hc.SetCurrentHeader(bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1)) | ||
| newHeadHeader := bc.GetHeader(currentHeader.ParentHash, currentHeader.Number.Uint64()-1) | ||
| rawdb.WriteHeadHeaderHash(batch, currentHeader.ParentHash) | ||
| bc.hc.SetCurrentHeader(newHeadHeader) | ||
| } | ||
| if currentFastBlock := bc.CurrentFastBlock(); currentFastBlock.Hash() == hash { | ||
| newFastBlock := bc.GetBlock(currentFastBlock.ParentHash(), currentFastBlock.NumberU64()-1) | ||
| rawdb.WriteHeadFastBlockHash(bc.db, newFastBlock.Hash()) | ||
| rawdb.WriteHeadFastBlockHash(batch, currentFastBlock.ParentHash()) | ||
| bc.currentFastBlock.Store(newFastBlock) | ||
| headFastBlockGauge.Update(int64(newFastBlock.NumberU64())) | ||
| } | ||
| if currentBlock := bc.CurrentBlock(); currentBlock.Hash() == hash { | ||
| newBlock := bc.GetBlock(currentBlock.ParentHash(), currentBlock.NumberU64()-1) | ||
| rawdb.WriteHeadBlockHash(bc.db, newBlock.Hash()) | ||
| rawdb.WriteHeadBlockHash(batch, currentBlock.ParentHash()) | ||
| bc.currentBlock.Store(newBlock) | ||
| headBlockGauge.Update(int64(newBlock.NumberU64())) | ||
| } | ||
| } | ||
| if err := batch.Write(); err != nil { | ||
| log.Crit("Failed to rollback chain markers", "err", err) | ||
| } | ||
| // Truncate ancient data which exceeds the current header. | ||
| // | ||
| // Notably, it can happen that system crashes without truncating the ancient data | ||
|
|
@@ -1061,7 +1090,6 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ | |
| } | ||
| // Don't collect too much in-memory, write it out every 100K blocks | ||
| if len(deleted) > 100000 { | ||
|
|
||
| // Sync the ancient store explicitly to ensure all data has been flushed to disk. | ||
| if err := bc.db.Sync(); err != nil { | ||
| return 0, err | ||
|
|
@@ -1170,15 +1198,21 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [ | |
| rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receiptChain[i]) | ||
| rawdb.WriteTxLookupEntries(batch, block) | ||
|
|
||
| stats.processed++ | ||
| // Write everything belongs to the blocks into the database. So that | ||
| // we can ensure all components of body is completed(body, receipts, | ||
| // tx indexes) | ||
| if batch.ValueSize() >= ethdb.IdealBatchSize { | ||
| if err := batch.Write(); err != nil { | ||
| return 0, err | ||
| } | ||
| size += batch.ValueSize() | ||
| batch.Reset() | ||
| } | ||
| stats.processed++ | ||
| } | ||
| // Write everything belongs to the blocks into the database. So that | ||
| // we can ensure all components of body is completed(body, receipts, | ||
| // tx indexes) | ||
| if batch.ValueSize() > 0 { | ||
| size += batch.ValueSize() | ||
| if err := batch.Write(); err != nil { | ||
|
|
@@ -1229,11 +1263,12 @@ func (bc *BlockChain) writeBlockWithoutState(block *types.Block, td *big.Int) (e | |
| bc.wg.Add(1) | ||
| defer bc.wg.Done() | ||
|
|
||
| if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), td); err != nil { | ||
| return err | ||
| batch := bc.db.NewBatch() | ||
| rawdb.WriteTd(batch, block.Hash(), block.NumberU64(), td) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's quite annoying. In theory, td is a part of header chain data, but some times we write the whole block with the given td. So we need to explicitly set the td into the cache. Btw we never update the other cache( |
||
| rawdb.WriteBlock(batch, block) | ||
| if err := batch.Write(); err != nil { | ||
| log.Crit("Failed to write block into disk", "err", err) | ||
| } | ||
| rawdb.WriteBlock(bc.db, block) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -1249,11 +1284,7 @@ func (bc *BlockChain) writeKnownBlock(block *types.Block) error { | |
| return err | ||
| } | ||
| } | ||
| // Write the positional metadata for transaction/receipt lookups. | ||
| // Preimages here is empty, ignore it. | ||
| rawdb.WriteTxLookupEntries(bc.db, block) | ||
|
|
||
| bc.insert(block) | ||
| bc.writeHeadBlock(block) | ||
| return nil | ||
| } | ||
|
|
||
|
|
@@ -1281,12 +1312,19 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. | |
| localTd := bc.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) | ||
| externTd := new(big.Int).Add(block.Difficulty(), ptd) | ||
|
|
||
| // Irrelevant of the canonical status, write the block itself to the database | ||
| if err := bc.hc.WriteTd(block.Hash(), block.NumberU64(), externTd); err != nil { | ||
| return NonStatTy, err | ||
| } | ||
| rawdb.WriteBlock(bc.db, block) | ||
|
|
||
| // Irrelevant of the canonical status, write the block itself to the database. | ||
| // | ||
| // Note all the components of block(td, hash->number map, header, body, receipts) | ||
| // should be written atomically. BlockBatch is used for containing all components. | ||
| blockBatch := bc.db.NewBatch() | ||
| rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question about tdcache as above |
||
| rawdb.WriteBlock(blockBatch, block) | ||
| rawdb.WriteReceipts(blockBatch, block.Hash(), block.NumberU64(), receipts) | ||
| rawdb.WritePreimages(blockBatch, state.Preimages()) | ||
| if err := blockBatch.Write(); err != nil { | ||
| log.Crit("Failed to write block into disk", "err", err) | ||
| } | ||
| // Commit all cached state changes into underlying memory database. | ||
| root, err := state.Commit(bc.chainConfig.IsEIP158(block.Number())) | ||
| if err != nil { | ||
| return NonStatTy, err | ||
|
|
@@ -1345,11 +1383,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. | |
| } | ||
| } | ||
| } | ||
|
|
||
| // Write other block data using a batch. | ||
| batch := bc.db.NewBatch() | ||
| rawdb.WriteReceipts(batch, block.Hash(), block.NumberU64(), receipts) | ||
|
|
||
| // If the total difficulty is higher than our known, add it to the canonical chain | ||
| // Second clause in the if statement reduces the vulnerability to selfish mining. | ||
| // Please refer to http://www.cs.cornell.edu/~ie53/publications/btcProcFC.pdf | ||
|
|
@@ -1375,21 +1408,13 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. | |
| return NonStatTy, err | ||
| } | ||
| } | ||
| // Write the positional metadata for transaction/receipt lookups and preimages | ||
| rawdb.WriteTxLookupEntries(batch, block) | ||
| rawdb.WritePreimages(batch, state.Preimages()) | ||
|
|
||
| status = CanonStatTy | ||
| } else { | ||
| status = SideStatTy | ||
| } | ||
| if err := batch.Write(); err != nil { | ||
| return NonStatTy, err | ||
| } | ||
|
|
||
| // Set new head. | ||
| if status == CanonStatTy { | ||
| bc.insert(block) | ||
| bc.writeHeadBlock(block) | ||
| } | ||
| bc.futureBlocks.Remove(block.Hash()) | ||
| return status, nil | ||
|
|
@@ -1959,20 +1984,19 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { | |
| // taking care of the proper incremental order. | ||
| for i := len(newChain) - 1; i >= 1; i-- { | ||
| // Insert the block in the canonical way, re-writing history | ||
| bc.insert(newChain[i]) | ||
| bc.writeHeadBlock(newChain[i]) | ||
|
|
||
| // Collect reborn logs due to chain reorg | ||
| collectLogs(newChain[i].Hash(), false) | ||
|
|
||
| // Write lookup entries for hash based transaction/receipt searches | ||
| rawdb.WriteTxLookupEntries(bc.db, newChain[i]) | ||
| // Collect the new added transactions. | ||
| addedTxs = append(addedTxs, newChain[i].Transactions()...) | ||
| } | ||
| // When transactions get deleted from the database, the receipts that were | ||
| // created in the fork must also be deleted | ||
| batch := bc.db.NewBatch() | ||
| // Delete useless indexes right now which includes the non-canonical | ||
| // transaction indexes, canonical chain indexes which above the head. | ||
| indexesBatch := bc.db.NewBatch() | ||
| for _, tx := range types.TxDifference(deletedTxs, addedTxs) { | ||
| rawdb.DeleteTxLookupEntry(batch, tx.Hash()) | ||
| rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash()) | ||
| } | ||
| // Delete any canonical number assignments above the new head | ||
| number := bc.CurrentBlock().NumberU64() | ||
|
|
@@ -1981,9 +2005,11 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error { | |
| if hash == (common.Hash{}) { | ||
| break | ||
| } | ||
| rawdb.DeleteCanonicalHash(batch, i) | ||
| rawdb.DeleteCanonicalHash(indexesBatch, i) | ||
| } | ||
| if err := indexesBatch.Write(); err != nil { | ||
| log.Crit("Failed to delete useless indexes", "err", err) | ||
| } | ||
| batch.Write() | ||
| // If any logs need to be fired, do it now. In theory we could avoid creating | ||
| // this goroutine if there are no events to fire, but realistcally that only | ||
| // ever happens if we're reorging empty blocks, which will only happen on idle | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.