Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 159 additions & 88 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ var (

errInsertionInterrupted = errors.New("insertion is interrupted")
errChainStopped = errors.New("blockchain is stopped")
errInvalidOldChain = errors.New("invalid old chain")
errInvalidNewChain = errors.New("invalid new chain")

CheckpointCh = make(chan int)
)
Expand Down Expand Up @@ -1483,7 +1485,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
if err := bc.reorg(currentBlock.Header(), block.Header()); err != nil {
return NonStatTy, err
}
}
Expand All @@ -1498,9 +1500,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
bc.writeHeadBlock(block, false)
// prepare set of masternodes for the next epoch
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
err := bc.UpdateM1()
if err != nil {
log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNum", block.NumberU64())
if err := bc.UpdateM1(); err != nil {
log.Crit("Fail to update masternodes during writeBlockWithState", "number", block.Number, "hash", block.Hash().Hex(), "err", err)
}
}
}
Expand Down Expand Up @@ -2282,153 +2283,223 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
// blocks and inserts them to be part of the new canonical chain and accumulates
// potential missing transactions and post an event about them.
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
func (bc *BlockChain) reorg(oldHead, newHead *types.Header) error {
log.Warn("Reorg", "OldHash", oldHead.Hash().Hex(), "OldNum", oldHead.Number, "NewHash", newHead.Hash().Hex(), "NewNum", newHead.Number)

var (
newChain types.Blocks
oldChain types.Blocks
commonBlock *types.Block
deletedTxs types.Transactions
addedTxs types.Transactions
deletedLogs []*types.Log
newChain []*types.Header
oldChain []*types.Header
commonBlock *types.Header
)
log.Warn("Reorg", "oldBlock hash", oldBlock.Hash().Hex(), "number", oldBlock.NumberU64(), "newBlock hash", newBlock.Hash().Hex(), "number", newBlock.NumberU64())

// first reduce whoever is higher bound
if oldBlock.NumberU64() > newBlock.NumberU64() {
// reduce old chain
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
oldChain = append(oldChain, oldBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
}

// Reduce the longer chain to the same number as the shorter one
if oldHead.Number.Uint64() > newHead.Number.Uint64() {
// Old chain is longer, gather all transactions and logs as deleted ones
for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) {
oldChain = append(oldChain, oldHead)
}
} else {
// reduce new chain and append new chain blocks for inserting later on
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
newChain = append(newChain, newBlock)
// New chain is longer, stash all blocks away for subsequent insertion
for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) {
newChain = append(newChain, newHead)
}
}
if oldBlock == nil {
return errors.New("invalid old chain")
if oldHead == nil {
return errInvalidOldChain
}
if newBlock == nil {
return errors.New("invalid new chain")
if newHead == nil {
return errInvalidNewChain
}

// Both sides of the reorg are at the same number, reduce both until the common
// ancestor is found
for {
if oldBlock.Hash() == newBlock.Hash() {
commonBlock = oldBlock
// If the common ancestor was found, bail out
if oldHead.Hash() == newHead.Hash() {
commonBlock = oldHead
break
}
// Remove an old block as well as stash away a new block
oldChain = append(oldChain, oldHead)
newChain = append(newChain, newHead)

oldChain = append(oldChain, oldBlock)
newChain = append(newChain, newBlock)
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
// Step back with both chains
oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1)
if oldHead == nil {
return errInvalidOldChain
}

oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
if oldBlock == nil {
return errors.New("invalid old chain")
}
if newBlock == nil {
return errors.New("invalid new chain")
newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1)
if newHead == nil {
return errInvalidNewChain
}
}

// Ensure XDPoS engine committed block will be not reverted
if xdpos, ok := bc.Engine().(*XDPoS.XDPoS); ok {
latestCommittedBlock := xdpos.EngineV2.GetLatestCommittedBlockInfo()
if latestCommittedBlock != nil {
currentBlock := bc.CurrentBlock()
currentBlock.Number().Cmp(latestCommittedBlock.Number)
cmp := commonBlock.Number().Cmp(latestCommittedBlock.Number)
cmp := commonBlock.Number.Cmp(latestCommittedBlock.Number)
if cmp < 0 {
for _, oldBlock := range oldChain {
if oldBlock.Number().Cmp(latestCommittedBlock.Number) == 0 {
if oldBlock.Number.Cmp(latestCommittedBlock.Number) == 0 {
if oldBlock.Hash() != latestCommittedBlock.Hash {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "committed hash", latestCommittedBlock.Hash)
log.Error("Impossible reorg, please file an issue", "OldNum", oldBlock.Number, "OldHash", oldBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex())
} else {
log.Warn("Stop reorg, blockchain is under forking attack", "old committed num", oldBlock.Number(), "old committed hash", oldBlock.Hash())
return fmt.Errorf("stop reorg, blockchain is under forking attack. old committed num %d, hash %x", oldBlock.Number(), oldBlock.Hash())
log.Warn("Stop reorg, blockchain is under forking attack", "OldCommittedNum", oldBlock.Number, "OldCommittedHash", oldBlock.Hash().Hex())
return fmt.Errorf("stop reorg, blockchain is under forking attack. OldCommitted num %d, hash %s", oldBlock.Number, oldBlock.Hash().Hex())
}
}
}
} else if cmp == 0 {
if commonBlock.Hash() != latestCommittedBlock.Hash {
log.Error("Impossible reorg, please file an issue", "oldnum", commonBlock.Number(), "oldhash", commonBlock.Hash(), "committed hash", latestCommittedBlock.Hash)
log.Error("Impossible reorg, please file an issue", "OldNum", commonBlock.Number.Uint64(), "OldHash", commonBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex())
}
}
}
}

// Ensure the user sees large reorgs
if len(oldChain) > 0 && len(newChain) > 0 {
logFn := log.Warn
logFn := log.Info
msg := "Chain reorg detected"
if len(oldChain) > 63 {
msg = "Large chain reorg detected"
logFn = log.Warn
}
logFn("Chain split detected", "number", commonBlock.Number(), "hash", commonBlock.Hash(),
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash().Hex(),
"drop", len(oldChain), "dropfrom", oldChain[0].Hash().Hex(), "add", len(newChain), "addfrom", newChain[0].Hash().Hex())
blockReorgAddMeter.Mark(int64(len(newChain)))
blockReorgDropMeter.Mark(int64(len(oldChain)))
blockReorgMeter.Mark(1)
} else if len(newChain) > 0 {
// Special case happens in the post merge stage that current head is
// the ancestor of new head while these two blocks are not consecutive
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash())
blockReorgAddMeter.Mark(int64(len(newChain)))
} else {
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
// len(newChain) == 0 && len(oldChain) > 0
// rewind the canonical chain to a lower point.
log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain))
}

// Insert the new chain(except the head block(reverse order)),
// taking care of the proper incremental order.
for i := len(newChain) - 1; i >= 0; i-- {
// insert the block in the canonical way, re-writing history
bc.writeHeadBlock(newChain[i], true)
// Acquire the tx-lookup lock before mutation. This step is essential
// as the txlookups should be changed atomically, and all subsequent
// reads should be blocked until the mutation is complete.
// bc.txLookupLock.Lock()

// Reorg can be executed, start reducing the chain's old blocks and appending
// the new blocks
var (
deletedTxs []common.Hash
rebirthTxs []common.Hash

deletedLogs []*types.Log
rebirthLogs []*types.Log
)

// Deleted log emission on the API uses forward order, which is borked, but
// we'll leave it in for legacy reasons.
//
// TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs?
{
for i := len(oldChain) - 1; i >= 0; i-- {
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
if block == nil {
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
}
if logs := bc.collectLogs(block, true); len(logs) > 0 {
deletedLogs = append(deletedLogs, logs...)
}
if len(deletedLogs) > 512 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
deletedLogs = nil
}
// TODO(daniel): remove chainSideFeed, reference PR #30601
// Also send event for blocks removed from the canon chain.
// bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
}

// Collect the new added transactions.
addedTxs = append(addedTxs, newChain[i].Transactions()...)
// Undo old blocks in reverse order
for i := 0; i < len(oldChain); i++ {
// Collect all the deleted transactions
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
if block == nil {
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
}
for _, tx := range block.Transactions() {
deletedTxs = append(deletedTxs, tx.Hash())
}
// Collect deleted logs and emit them for new integrations
// if logs := bc.collectLogs(block, true); len(logs) > 0 {
// slices.Reverse(logs) // Emit revertals latest first, older then
// }
}

// Apply new blocks in forward order
for i := len(newChain) - 1; i >= 0; i-- {
// Collect all the included transactions
block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64())
if block == nil {
return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics
}
for _, tx := range block.Transactions() {
rebirthTxs = append(rebirthTxs, tx.Hash())
}
// Collect inserted logs and emit them
if logs := bc.collectLogs(block, false); len(logs) > 0 {
rebirthLogs = append(rebirthLogs, logs...)
}
if len(rebirthLogs) > 512 {
bc.logsFeed.Send(rebirthLogs)
rebirthLogs = nil
}
// Update the head block
bc.writeHeadBlock(block, true)
// prepare set of masternodes for the next epoch
if bc.chainConfig.XDPoS != nil && ((newChain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
err := bc.UpdateM1()
if err != nil {
log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNumber", newChain[i].NumberU64())
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
if err := bc.UpdateM1(); err != nil {
log.Crit("Fail to update masternodes during reorg", "number", block.Number, "hash", block.Hash().Hex(), "err", err)
}
}
}
if len(rebirthLogs) > 0 {
bc.logsFeed.Send(rebirthLogs)
}

// Delete useless indexes right now which includes the non-canonical
// transaction indexes, canonical chain indexes which above the head.
indexesBatch := bc.db.NewBatch()
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
batch := bc.db.NewBatch()
for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) {
rawdb.DeleteTxLookupEntry(batch, tx)
}
// Delete all hash markers that are not part of the new canonical chain.
// Because the reorg function handles new chain head, all hash
// markers greater than new chain head should be deleted.
number := commonBlock.Number
if len(newChain) > 0 {
number = newChain[0].Number
}
// Delete any canonical number assignments above the new head
number := bc.CurrentBlock().NumberU64()
for i := number + 1; ; i++ {
for i := number.Uint64() + 1; ; i++ {
hash := rawdb.ReadCanonicalHash(bc.db, i)
if hash == (common.Hash{}) {
break
}
rawdb.DeleteCanonicalHash(indexesBatch, i)
rawdb.DeleteCanonicalHash(batch, i)
}
if err := indexesBatch.Write(); err != nil {
if err := batch.Write(); err != nil {
log.Crit("Failed to delete useless indexes", "err", err)
}
// If any logs need to be fired, do it now. In theory we could avoid creating
// this goroutine if there are no events to fire, but realistcally that only
// ever happens if we're reorging empty blocks, which will only happen on idle
// networks where performance is not an issue either way.
if len(deletedLogs) > 0 {
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
}
if len(oldChain) > 0 {
go func() {
for i := len(oldChain) - 1; i >= 0; i-- {
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
}
}()
}

// Reset the tx lookup cache to clear stale txlookup cache.
// bc.txLookupCache.Purge()

// Release the tx-lookup lock after mutation.
// bc.txLookupLock.Unlock()

return nil
}

Expand Down
18 changes: 18 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,24 @@ func TxDifference(a, b Transactions) (keep Transactions) {
return keep
}

// HashDifference returns a new set of hashes that are present in a but not in b.
func HashDifference(a, b []common.Hash) []common.Hash {
keep := make([]common.Hash, 0, len(a))

remove := make(map[common.Hash]struct{})
for _, hash := range b {
remove[hash] = struct{}{}
}

for _, hash := range a {
if _, ok := remove[hash]; !ok {
keep = append(keep, hash)
}
}

return keep
}

// TxByNonce implements the sort interface to allow sorting a list of transactions
// by their nonces. This is usually only useful for sorting transactions from a
// single account, otherwise a nonce comparison doesn't make much sense.
Expand Down
Loading