Skip to content

Commit 2779b08

Browse files
authored
Merge pull request #780 from gzliudan/better_reorg
core: refactor the function reorg
2 parents adba311 + cbdf0e6 commit 2779b08

2 files changed

Lines changed: 177 additions & 88 deletions

File tree

core/blockchain.go

Lines changed: 159 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ var (
8484

8585
errInsertionInterrupted = errors.New("insertion is interrupted")
8686
errChainStopped = errors.New("blockchain is stopped")
87+
errInvalidOldChain = errors.New("invalid old chain")
88+
errInvalidNewChain = errors.New("invalid new chain")
8789

8890
CheckpointCh = make(chan int)
8991
)
@@ -1483,7 +1485,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
14831485
if reorg {
14841486
// Reorganise the chain if the parent is not the head block
14851487
if block.ParentHash() != currentBlock.Hash() {
1486-
if err := bc.reorg(currentBlock, block); err != nil {
1488+
if err := bc.reorg(currentBlock.Header(), block.Header()); err != nil {
14871489
return NonStatTy, err
14881490
}
14891491
}
@@ -1498,9 +1500,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
14981500
bc.writeHeadBlock(block, false)
14991501
// prepare set of masternodes for the next epoch
15001502
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
1501-
err := bc.UpdateM1()
1502-
if err != nil {
1503-
log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNum", block.NumberU64())
1503+
if err := bc.UpdateM1(); err != nil {
1504+
log.Crit("Fail to update masternodes during writeBlockWithState", "number", block.Number, "hash", block.Hash().Hex(), "err", err)
15041505
}
15051506
}
15061507
}
@@ -2282,153 +2283,223 @@ func (bc *BlockChain) collectLogs(b *types.Block, removed bool) []*types.Log {
22822283
// reorg takes two blocks, an old chain and a new chain and will reconstruct the
22832284
// blocks and inserts them to be part of the new canonical chain and accumulates
22842285
// potential missing transactions and post an event about them.
2285-
func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
2286+
func (bc *BlockChain) reorg(oldHead, newHead *types.Header) error {
2287+
log.Warn("Reorg", "OldHash", oldHead.Hash().Hex(), "OldNum", oldHead.Number, "NewHash", newHead.Hash().Hex(), "NewNum", newHead.Number)
2288+
22862289
var (
2287-
newChain types.Blocks
2288-
oldChain types.Blocks
2289-
commonBlock *types.Block
2290-
deletedTxs types.Transactions
2291-
addedTxs types.Transactions
2292-
deletedLogs []*types.Log
2290+
newChain []*types.Header
2291+
oldChain []*types.Header
2292+
commonBlock *types.Header
22932293
)
2294-
log.Warn("Reorg", "oldBlock hash", oldBlock.Hash().Hex(), "number", oldBlock.NumberU64(), "newBlock hash", newBlock.Hash().Hex(), "number", newBlock.NumberU64())
2295-
2296-
// first reduce whoever is higher bound
2297-
if oldBlock.NumberU64() > newBlock.NumberU64() {
2298-
// reduce old chain
2299-
for ; oldBlock != nil && oldBlock.NumberU64() != newBlock.NumberU64(); oldBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1) {
2300-
oldChain = append(oldChain, oldBlock)
2301-
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
2302-
if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 {
2303-
deletedLogs = append(deletedLogs, logs...)
2304-
}
2294+
2295+
// Reduce the longer chain to the same number as the shorter one
2296+
if oldHead.Number.Uint64() > newHead.Number.Uint64() {
2297+
// Old chain is longer, gather all transactions and logs as deleted ones
2298+
for ; oldHead != nil && oldHead.Number.Uint64() != newHead.Number.Uint64(); oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1) {
2299+
oldChain = append(oldChain, oldHead)
23052300
}
23062301
} else {
2307-
// reduce new chain and append new chain blocks for inserting later on
2308-
for ; newBlock != nil && newBlock.NumberU64() != oldBlock.NumberU64(); newBlock = bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1) {
2309-
newChain = append(newChain, newBlock)
2302+
// New chain is longer, stash all blocks away for subsequent insertion
2303+
for ; newHead != nil && newHead.Number.Uint64() != oldHead.Number.Uint64(); newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1) {
2304+
newChain = append(newChain, newHead)
23102305
}
23112306
}
2312-
if oldBlock == nil {
2313-
return errors.New("invalid old chain")
2307+
if oldHead == nil {
2308+
return errInvalidOldChain
23142309
}
2315-
if newBlock == nil {
2316-
return errors.New("invalid new chain")
2310+
if newHead == nil {
2311+
return errInvalidNewChain
23172312
}
23182313

2314+
// Both sides of the reorg are at the same number, reduce both until the common
2315+
// ancestor is found
23192316
for {
2320-
if oldBlock.Hash() == newBlock.Hash() {
2321-
commonBlock = oldBlock
2317+
// If the common ancestor was found, bail out
2318+
if oldHead.Hash() == newHead.Hash() {
2319+
commonBlock = oldHead
23222320
break
23232321
}
2322+
// Remove an old block as well as stash away a new block
2323+
oldChain = append(oldChain, oldHead)
2324+
newChain = append(newChain, newHead)
23242325

2325-
oldChain = append(oldChain, oldBlock)
2326-
newChain = append(newChain, newBlock)
2327-
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
2328-
if logs := bc.collectLogs(oldBlock, true); len(logs) > 0 {
2329-
deletedLogs = append(deletedLogs, logs...)
2326+
// Step back with both chains
2327+
oldHead = bc.GetHeader(oldHead.ParentHash, oldHead.Number.Uint64()-1)
2328+
if oldHead == nil {
2329+
return errInvalidOldChain
23302330
}
2331-
2332-
oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
2333-
if oldBlock == nil {
2334-
return errors.New("invalid old chain")
2335-
}
2336-
if newBlock == nil {
2337-
return errors.New("invalid new chain")
2331+
newHead = bc.GetHeader(newHead.ParentHash, newHead.Number.Uint64()-1)
2332+
if newHead == nil {
2333+
return errInvalidNewChain
23382334
}
23392335
}
23402336

23412337
// Ensure XDPoS engine committed block will be not reverted
23422338
if xdpos, ok := bc.Engine().(*XDPoS.XDPoS); ok {
23432339
latestCommittedBlock := xdpos.EngineV2.GetLatestCommittedBlockInfo()
23442340
if latestCommittedBlock != nil {
2345-
currentBlock := bc.CurrentBlock()
2346-
currentBlock.Number().Cmp(latestCommittedBlock.Number)
2347-
cmp := commonBlock.Number().Cmp(latestCommittedBlock.Number)
2341+
cmp := commonBlock.Number.Cmp(latestCommittedBlock.Number)
23482342
if cmp < 0 {
23492343
for _, oldBlock := range oldChain {
2350-
if oldBlock.Number().Cmp(latestCommittedBlock.Number) == 0 {
2344+
if oldBlock.Number.Cmp(latestCommittedBlock.Number) == 0 {
23512345
if oldBlock.Hash() != latestCommittedBlock.Hash {
2352-
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "committed hash", latestCommittedBlock.Hash)
2346+
log.Error("Impossible reorg, please file an issue", "OldNum", oldBlock.Number, "OldHash", oldBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex())
23532347
} else {
2354-
log.Warn("Stop reorg, blockchain is under forking attack", "old committed num", oldBlock.Number(), "old committed hash", oldBlock.Hash())
2355-
return fmt.Errorf("stop reorg, blockchain is under forking attack. old committed num %d, hash %x", oldBlock.Number(), oldBlock.Hash())
2348+
log.Warn("Stop reorg, blockchain is under forking attack", "OldCommittedNum", oldBlock.Number, "OldCommittedHash", oldBlock.Hash().Hex())
2349+
return fmt.Errorf("stop reorg, blockchain is under forking attack. OldCommitted num %d, hash %s", oldBlock.Number, oldBlock.Hash().Hex())
23562350
}
23572351
}
23582352
}
23592353
} else if cmp == 0 {
23602354
if commonBlock.Hash() != latestCommittedBlock.Hash {
2361-
log.Error("Impossible reorg, please file an issue", "oldnum", commonBlock.Number(), "oldhash", commonBlock.Hash(), "committed hash", latestCommittedBlock.Hash)
2355+
log.Error("Impossible reorg, please file an issue", "OldNum", commonBlock.Number.Uint64(), "OldHash", commonBlock.Hash().Hex(), "LatestCommittedHash", latestCommittedBlock.Hash.Hex())
23622356
}
23632357
}
23642358
}
23652359
}
23662360

23672361
// Ensure the user sees large reorgs
23682362
if len(oldChain) > 0 && len(newChain) > 0 {
2369-
logFn := log.Warn
2363+
logFn := log.Info
2364+
msg := "Chain reorg detected"
23702365
if len(oldChain) > 63 {
2366+
msg = "Large chain reorg detected"
23712367
logFn = log.Warn
23722368
}
2373-
logFn("Chain split detected", "number", commonBlock.Number(), "hash", commonBlock.Hash(),
2374-
"drop", len(oldChain), "dropfrom", oldChain[0].Hash(), "add", len(newChain), "addfrom", newChain[0].Hash())
2369+
logFn(msg, "number", commonBlock.Number, "hash", commonBlock.Hash().Hex(),
2370+
"drop", len(oldChain), "dropfrom", oldChain[0].Hash().Hex(), "add", len(newChain), "addfrom", newChain[0].Hash().Hex())
23752371
blockReorgAddMeter.Mark(int64(len(newChain)))
23762372
blockReorgDropMeter.Mark(int64(len(oldChain)))
23772373
blockReorgMeter.Mark(1)
2374+
} else if len(newChain) > 0 {
2375+
// Special case happens in the post merge stage that current head is
2376+
// the ancestor of new head while these two blocks are not consecutive
2377+
log.Info("Extend chain", "add", len(newChain), "number", newChain[0].Number, "hash", newChain[0].Hash())
2378+
blockReorgAddMeter.Mark(int64(len(newChain)))
23782379
} else {
2379-
log.Error("Impossible reorg, please file an issue", "oldnum", oldBlock.Number(), "oldhash", oldBlock.Hash(), "newnum", newBlock.Number(), "newhash", newBlock.Hash())
2380+
// len(newChain) == 0 && len(oldChain) > 0
2381+
// rewind the canonical chain to a lower point.
2382+
log.Error("Impossible reorg, please file an issue", "oldnum", oldHead.Number, "oldhash", oldHead.Hash(), "oldblocks", len(oldChain), "newnum", newHead.Number, "newhash", newHead.Hash(), "newblocks", len(newChain))
23802383
}
23812384

2382-
// Insert the new chain(except the head block(reverse order)),
2383-
// taking care of the proper incremental order.
2384-
for i := len(newChain) - 1; i >= 0; i-- {
2385-
// insert the block in the canonical way, re-writing history
2386-
bc.writeHeadBlock(newChain[i], true)
2385+
// Acquire the tx-lookup lock before mutation. This step is essential
2386+
// as the txlookups should be changed atomically, and all subsequent
2387+
// reads should be blocked until the mutation is complete.
2388+
// bc.txLookupLock.Lock()
2389+
2390+
// Reorg can be executed, start reducing the chain's old blocks and appending
2391+
// the new blocks
2392+
var (
2393+
deletedTxs []common.Hash
2394+
rebirthTxs []common.Hash
2395+
2396+
deletedLogs []*types.Log
2397+
rebirthLogs []*types.Log
2398+
)
2399+
2400+
// Deleted log emission on the API uses forward order, which is borked, but
2401+
// we'll leave it in for legacy reasons.
2402+
//
2403+
// TODO(karalabe): This should be nuked out, no idea how, deprecate some APIs?
2404+
{
2405+
for i := len(oldChain) - 1; i >= 0; i-- {
2406+
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
2407+
if block == nil {
2408+
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
2409+
}
2410+
if logs := bc.collectLogs(block, true); len(logs) > 0 {
2411+
deletedLogs = append(deletedLogs, logs...)
2412+
}
2413+
if len(deletedLogs) > 512 {
2414+
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
2415+
deletedLogs = nil
2416+
}
2417+
// TODO(daniel): remove chainSideFeed, reference PR #30601
2418+
// Also send event for blocks removed from the canon chain.
2419+
// bc.chainSideFeed.Send(ChainSideEvent{Block: block})
2420+
}
2421+
if len(deletedLogs) > 0 {
2422+
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
2423+
}
2424+
}
23872425

2388-
// Collect the new added transactions.
2389-
addedTxs = append(addedTxs, newChain[i].Transactions()...)
2426+
// Undo old blocks in reverse order
2427+
for i := 0; i < len(oldChain); i++ {
2428+
// Collect all the deleted transactions
2429+
block := bc.GetBlock(oldChain[i].Hash(), oldChain[i].Number.Uint64())
2430+
if block == nil {
2431+
return errInvalidOldChain // Corrupt database, mostly here to avoid weird panics
2432+
}
2433+
for _, tx := range block.Transactions() {
2434+
deletedTxs = append(deletedTxs, tx.Hash())
2435+
}
2436+
// Collect deleted logs and emit them for new integrations
2437+
// if logs := bc.collectLogs(block, true); len(logs) > 0 {
2438+
// slices.Reverse(logs) // Emit revertals latest first, older then
2439+
// }
2440+
}
23902441

2442+
// Apply new blocks in forward order
2443+
for i := len(newChain) - 1; i >= 0; i-- {
2444+
// Collect all the included transactions
2445+
block := bc.GetBlock(newChain[i].Hash(), newChain[i].Number.Uint64())
2446+
if block == nil {
2447+
return errInvalidNewChain // Corrupt database, mostly here to avoid weird panics
2448+
}
2449+
for _, tx := range block.Transactions() {
2450+
rebirthTxs = append(rebirthTxs, tx.Hash())
2451+
}
2452+
// Collect inserted logs and emit them
2453+
if logs := bc.collectLogs(block, false); len(logs) > 0 {
2454+
rebirthLogs = append(rebirthLogs, logs...)
2455+
}
2456+
if len(rebirthLogs) > 512 {
2457+
bc.logsFeed.Send(rebirthLogs)
2458+
rebirthLogs = nil
2459+
}
2460+
// Update the head block
2461+
bc.writeHeadBlock(block, true)
23912462
// prepare set of masternodes for the next epoch
2392-
if bc.chainConfig.XDPoS != nil && ((newChain[i].NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
2393-
err := bc.UpdateM1()
2394-
if err != nil {
2395-
log.Crit("Error when update masternodes set. Stopping node", "err", err, "blockNumber", newChain[i].NumberU64())
2463+
if bc.chainConfig.XDPoS != nil && ((block.NumberU64() % bc.chainConfig.XDPoS.Epoch) == (bc.chainConfig.XDPoS.Epoch - bc.chainConfig.XDPoS.Gap)) {
2464+
if err := bc.UpdateM1(); err != nil {
2465+
log.Crit("Fail to update masternodes during reorg", "number", block.Number, "hash", block.Hash().Hex(), "err", err)
23962466
}
23972467
}
23982468
}
2469+
if len(rebirthLogs) > 0 {
2470+
bc.logsFeed.Send(rebirthLogs)
2471+
}
23992472

24002473
// Delete useless indexes right now which includes the non-canonical
24012474
// transaction indexes, canonical chain indexes which above the head.
2402-
indexesBatch := bc.db.NewBatch()
2403-
for _, tx := range types.TxDifference(deletedTxs, addedTxs) {
2404-
rawdb.DeleteTxLookupEntry(indexesBatch, tx.Hash())
2475+
batch := bc.db.NewBatch()
2476+
for _, tx := range types.HashDifference(deletedTxs, rebirthTxs) {
2477+
rawdb.DeleteTxLookupEntry(batch, tx)
2478+
}
2479+
// Delete all hash markers that are not part of the new canonical chain.
2480+
// Because the reorg function handles new chain head, all hash
2481+
// markers greater than new chain head should be deleted.
2482+
number := commonBlock.Number
2483+
if len(newChain) > 0 {
2484+
number = newChain[0].Number
24052485
}
2406-
// Delete any canonical number assignments above the new head
2407-
number := bc.CurrentBlock().NumberU64()
2408-
for i := number + 1; ; i++ {
2486+
for i := number.Uint64() + 1; ; i++ {
24092487
hash := rawdb.ReadCanonicalHash(bc.db, i)
24102488
if hash == (common.Hash{}) {
24112489
break
24122490
}
2413-
rawdb.DeleteCanonicalHash(indexesBatch, i)
2491+
rawdb.DeleteCanonicalHash(batch, i)
24142492
}
2415-
if err := indexesBatch.Write(); err != nil {
2493+
if err := batch.Write(); err != nil {
24162494
log.Crit("Failed to delete useless indexes", "err", err)
24172495
}
2418-
// If any logs need to be fired, do it now. In theory we could avoid creating
2419-
// this goroutine if there are no events to fire, but realistcally that only
2420-
// ever happens if we're reorging empty blocks, which will only happen on idle
2421-
// networks where performance is not an issue either way.
2422-
if len(deletedLogs) > 0 {
2423-
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
2424-
}
2425-
if len(oldChain) > 0 {
2426-
go func() {
2427-
for i := len(oldChain) - 1; i >= 0; i-- {
2428-
bc.chainSideFeed.Send(ChainSideEvent{Block: oldChain[i]})
2429-
}
2430-
}()
2431-
}
2496+
2497+
// Reset the tx lookup cache to clear stale txlookup cache.
2498+
// bc.txLookupCache.Purge()
2499+
2500+
// Release the tx-lookup lock after mutation.
2501+
// bc.txLookupLock.Unlock()
2502+
24322503
return nil
24332504
}
24342505

core/types/transaction.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,24 @@ func TxDifference(a, b Transactions) (keep Transactions) {
677677
return keep
678678
}
679679

680+
// HashDifference returns a new set of hashes that are present in a but not in b.
681+
func HashDifference(a, b []common.Hash) []common.Hash {
682+
keep := make([]common.Hash, 0, len(a))
683+
684+
remove := make(map[common.Hash]struct{})
685+
for _, hash := range b {
686+
remove[hash] = struct{}{}
687+
}
688+
689+
for _, hash := range a {
690+
if _, ok := remove[hash]; !ok {
691+
keep = append(keep, hash)
692+
}
693+
}
694+
695+
return keep
696+
}
697+
680698
// TxByNonce implements the sort interface to allow sorting a list of transactions
681699
// by their nonces. This is usually only useful for sorting transactions from a
682700
// single account, otherwise a nonce comparison doesn't make much sense.

0 commit comments

Comments
 (0)