From a508a141add4640e1cea8a1efca78ef0f6c43059 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Mon, 13 Oct 2025 22:10:17 +0800 Subject: [PATCH 1/5] remove unused cache in stage block hashes --- .../stagedstreamsync/stage_blockhashes.go | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/api/service/synchronize/stagedstreamsync/stage_blockhashes.go b/api/service/synchronize/stagedstreamsync/stage_blockhashes.go index 1529a930b5..3eafb4ff79 100644 --- a/api/service/synchronize/stagedstreamsync/stage_blockhashes.go +++ b/api/service/synchronize/stagedstreamsync/stage_blockhashes.go @@ -30,7 +30,6 @@ type StageBlockHashesCfg struct { db kv.RwDB concurrency int protocol syncProtocol - cachedb kv.RwDB logProgress bool logger zerolog.Logger } @@ -512,24 +511,6 @@ func (bh *StageBlockHashes) clearBlockHashesBucket(tx kv.RwTx) error { return nil } -// clearCache removes block hashes from cache db -func (bh *StageBlockHashes) clearCache() error { - tx, err := bh.configs.cachedb.BeginRw(context.Background()) - if err != nil { - return nil - } - defer tx.Rollback() - if err := tx.ClearBucket(BlockHashesBucket); err != nil { - return nil - } - - if err := tx.Commit(); err != nil { - return err - } - - return nil -} - func (bh *StageBlockHashes) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { useInternalTx := tx == nil if useInternalTx { @@ -548,14 +529,6 @@ func (bh *StageBlockHashes) Revert(ctx context.Context, firstCycle bool, u *Reve return err } - // clean cache db as well - if err := bh.clearCache(); err != nil { - bh.configs.logger.Error(). - Err(err). - Msgf("[STAGED_STREAM_SYNC] clear block hashes cache failed") - return err - } - // save progress currentHead := bh.configs.bc.CurrentBlock().NumberU64() if err = s.Update(tx, currentHead); err != nil { From 2db7e47d1d4c1363c0c42bbbc0c7d6d5e5d1b107 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Mon, 13 Oct 2025 23:16:03 +0800 Subject: [PATCH 2/5] validate block hashes in cache before using them --- .../stagedstreamsync/stage_blockhashes.go | 66 +++++++++++++++---- 1 file changed, 53 insertions(+), 13 deletions(-) diff --git a/api/service/synchronize/stagedstreamsync/stage_blockhashes.go b/api/service/synchronize/stagedstreamsync/stage_blockhashes.go index 3eafb4ff79..d4a302a2df 100644 --- a/api/service/synchronize/stagedstreamsync/stage_blockhashes.go +++ b/api/service/synchronize/stagedstreamsync/stage_blockhashes.go @@ -105,20 +105,31 @@ func (bh *StageBlockHashes) Exec(ctx context.Context, firstCycle bool, invalidBl return err } currProgress = currentHead - } else if currProgress >= targetHeight { - //TODO: validate hashes (from currentHead+1 to targetHeight) + // update progress in db + if err := s.Update(tx, currProgress); err != nil { + return err + } + } else if currProgress > currentHead { + // Validate that hashes from currentHead+1 to currProgress actually exist in DB + if err := bh.validateHashesExist(ctx, tx, currentHead+1, currProgress); err != nil { + bh.configs.logger.Warn(). + Err(err). + Uint64("currentHead", currentHead). + Uint64("currProgress", currProgress). + Msg("[STAGED_STREAM_SYNC] hashes validation failed, resetting progress to currentHead") + if err := bh.clearBlockHashesBucket(tx); err != nil { + return err + } + currProgress = currentHead + // update progress in db + if err := s.Update(tx, currProgress); err != nil { + return err + } + } + } + // if all hashes are exist and validated, return + if currProgress >= targetHeight { return nil - } else if currProgress > currentHead && currProgress < targetHeight { - // TODO: validate hashes (from currentHead to currProgress) - - // key := strconv.FormatUint(currProgress, 10) - // bucketName := bh.configs.blockDBs - // currHash := []byte{} - // if currHash, err = etx.GetOne(bucketName, []byte(key)); err != nil || len(currHash[:]) == 0 { - // //TODO: currProgress and DB don't match. Either re-download all or verify db and set currProgress to last - // return err - // } - // startHash.SetBytes(currHash[:]) } startTime := time.Now() @@ -511,6 +522,35 @@ func (bh *StageBlockHashes) clearBlockHashesBucket(tx kv.RwTx) error { return nil } +// validateHashesExist checks if block hashes exist in the database for the given range +func (bh *StageBlockHashes) validateHashesExist(ctx context.Context, tx kv.Tx, startBlock, endBlock uint64) error { + useInternalTx := tx == nil + var err error + if useInternalTx { + tx, err = bh.configs.db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } + for blockNum := startBlock; blockNum <= endBlock; blockNum++ { + blkKey := marshalData(blockNum) + hash, err := tx.GetOne(BlockHashesBucket, blkKey) + if err != nil { + return fmt.Errorf("[STAGED_STREAM_SYNC] validateHashesExist: failed to get hash for block %d: %w", blockNum, err) + } + if len(hash) == 0 { + return fmt.Errorf("[STAGED_STREAM_SYNC] validateHashesExist: hash not found for block %d", blockNum) + } + } + if useInternalTx { + if err = tx.Commit(); err != nil { + return err + } + } + return nil +} + func (bh *StageBlockHashes) Revert(ctx context.Context, firstCycle bool, u *RevertState, s *StageState, tx kv.RwTx) (err error) { useInternalTx := tx == nil if useInternalTx { From 3d37e3203018d4ca22f9f3bf92fdf2ab3b1c4960 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Tue, 14 Oct 2025 11:38:35 +0800 Subject: [PATCH 3/5] clean block caches db if block data are corrupted or not saved --- .../stagedstreamsync/stage_bodies.go | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/api/service/synchronize/stagedstreamsync/stage_bodies.go b/api/service/synchronize/stagedstreamsync/stage_bodies.go index 2f4cbce54e..1a2ec23613 100644 --- a/api/service/synchronize/stagedstreamsync/stage_bodies.go +++ b/api/service/synchronize/stagedstreamsync/stage_bodies.go @@ -105,11 +105,27 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev return errV } - if currProgress <= currentHead { + // if currProgress is not equal to currentHead, clean all block DBs + // because it means stage was interrupted and we need to start from scratch + // this is to prevent the case where the block bodies are not saved in the cache databases or are corrupted + // we can't validate the block bodies from currentHead+1 to currProgress because the block bodies are per stream and + // download details are not available + if currProgress > 0 && currProgress != currentHead { + b.configs.logger.Info(). + Uint64("currProgress", currProgress). + Uint64("currentHead", currentHead). + Msg("[STAGED_STREAM_SYNC] block bodies validation failed, clearing all block DBs and resetting progress to currentHead") if err := b.cleanAllBlockDBs(ctx); err != nil { + b.configs.logger.Error(). + Err(err). + Msg("[STAGED_STREAM_SYNC] clear all block DBs failed") return err } currProgress = currentHead + // update progress in db + if err := s.Update(tx, currProgress); err != nil { + return err + } } if currProgress >= targetHeight { @@ -787,6 +803,9 @@ func (b *StageBodies) Revert(ctx context.Context, firstCycle bool, u *RevertStat //clean all blocks DBs if err := b.cleanAllBlockDBs(ctx); err != nil { + b.configs.logger.Error(). + Err(err). + Msgf("[STAGED_STREAM_SYNC] StageBodies Revert: clean all block DBs after revert failed") return err } @@ -827,6 +846,9 @@ func (b *StageBodies) Revert(ctx context.Context, firstCycle bool, u *RevertStat func (b *StageBodies) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) { //clean all blocks DBs if err := b.cleanAllBlockDBs(ctx); err != nil { + b.configs.logger.Error(). + Err(err). + Msgf("[STAGED_STREAM_SYNC] StageBodies CleanUp: clean all block DBs after cleanup failed") return err } From 35059d18ad8e3ebae5e08a669f26614461a3a42a Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Wed, 15 Oct 2025 09:15:51 +0800 Subject: [PATCH 4/5] staged sync block_bodies: readjust cuurProgress if it is not saved or corrupted --- .../synchronize/stagedstreamsync/stage_bodies.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/api/service/synchronize/stagedstreamsync/stage_bodies.go b/api/service/synchronize/stagedstreamsync/stage_bodies.go index 1a2ec23613..ccd0c65a8a 100644 --- a/api/service/synchronize/stagedstreamsync/stage_bodies.go +++ b/api/service/synchronize/stagedstreamsync/stage_bodies.go @@ -105,12 +105,21 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev return errV } + // if currProgress is 0, reset to currentHead + if currProgress == 0 { + currProgress = currentHead + // update progress in db + if err := s.Update(tx, currProgress); err != nil { + return err + } + } + // if currProgress is not equal to currentHead, clean all block DBs // because it means stage was interrupted and we need to start from scratch // this is to prevent the case where the block bodies are not saved in the cache databases or are corrupted // we can't validate the block bodies from currentHead+1 to currProgress because the block bodies are per stream and // download details are not available - if currProgress > 0 && currProgress != currentHead { + if currProgress != currentHead { b.configs.logger.Info(). Uint64("currProgress", currProgress). Uint64("currentHead", currentHead). @@ -128,6 +137,8 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev } } + // currProgress is already equal to currentHead + // so if it's already caught up to targetHeight, it must skip the download loop if currProgress >= targetHeight { return nil } From 38d4fb7a6b0f961e87f43d7bb8918eff1612e302 Mon Sep 17 00:00:00 2001 From: GheisMohammadi Date: Wed, 15 Oct 2025 09:16:08 +0800 Subject: [PATCH 5/5] staged sync: mark bad block as completed if already exist in chain --- api/service/synchronize/stagedstreamsync/stage_states.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/api/service/synchronize/stagedstreamsync/stage_states.go b/api/service/synchronize/stagedstreamsync/stage_states.go index df0aacba1f..788f5f6a95 100644 --- a/api/service/synchronize/stagedstreamsync/stage_states.go +++ b/api/service/synchronize/stagedstreamsync/stage_states.go @@ -178,6 +178,12 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR if blk := stg.configs.bc.GetBlock(block.Hash(), block.NumberU64()); blk != nil { if blk.NumberU64() == block.NumberU64() && blk.Hash() == block.Hash() { stg.configs.bc.CurrentHeader().SetNumber(block.Number()) + gbm.MarkBlockCompleted(i) + if invalidBlockRevert { + if s.state.invalidBlock.Number == i { + s.state.invalidBlock.resolve() + } + } continue } }