Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 47 additions & 34 deletions api/service/synchronize/stagedstreamsync/stage_blockhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type StageBlockHashesCfg struct {
db kv.RwDB
concurrency int
protocol syncProtocol
cachedb kv.RwDB
logProgress bool
logger zerolog.Logger
}
Expand Down Expand Up @@ -106,20 +105,31 @@ func (bh *StageBlockHashes) Exec(ctx context.Context, firstCycle bool, invalidBl
return err
}
currProgress = currentHead
} else if currProgress >= targetHeight {
//TODO: validate hashes (from currentHead+1 to targetHeight)
// update progress in db
if err := s.Update(tx, currProgress); err != nil {
return err
}
} else if currProgress > currentHead {
// Validate that hashes from currentHead+1 to currProgress actually exist in DB
if err := bh.validateHashesExist(ctx, tx, currentHead+1, currProgress); err != nil {
bh.configs.logger.Warn().
Err(err).
Uint64("currentHead", currentHead).
Uint64("currProgress", currProgress).
Msg("[STAGED_STREAM_SYNC] hashes validation failed, resetting progress to currentHead")
if err := bh.clearBlockHashesBucket(tx); err != nil {
return err
}
currProgress = currentHead
// update progress in db
if err := s.Update(tx, currProgress); err != nil {
return err
}
}
}
// if all hashes are exist and validated, return
if currProgress >= targetHeight {
return nil
} else if currProgress > currentHead && currProgress < targetHeight {
// TODO: validate hashes (from currentHead to currProgress)

// key := strconv.FormatUint(currProgress, 10)
// bucketName := bh.configs.blockDBs
// currHash := []byte{}
// if currHash, err = etx.GetOne(bucketName, []byte(key)); err != nil || len(currHash[:]) == 0 {
// //TODO: currProgress and DB don't match. Either re-download all or verify db and set currProgress to last
// return err
// }
// startHash.SetBytes(currHash[:])
}

startTime := time.Now()
Expand Down Expand Up @@ -512,21 +522,32 @@ func (bh *StageBlockHashes) clearBlockHashesBucket(tx kv.RwTx) error {
return nil
}

// clearCache removes block hashes from cache db
func (bh *StageBlockHashes) clearCache() error {
tx, err := bh.configs.cachedb.BeginRw(context.Background())
if err != nil {
return nil
// validateHashesExist checks if block hashes exist in the database for the given range
func (bh *StageBlockHashes) validateHashesExist(ctx context.Context, tx kv.Tx, startBlock, endBlock uint64) error {
useInternalTx := tx == nil
var err error
if useInternalTx {
tx, err = bh.configs.db.BeginRw(ctx)
if err != nil {
return err
}
defer tx.Rollback()
}
defer tx.Rollback()
if err := tx.ClearBucket(BlockHashesBucket); err != nil {
return nil
for blockNum := startBlock; blockNum <= endBlock; blockNum++ {
blkKey := marshalData(blockNum)
hash, err := tx.GetOne(BlockHashesBucket, blkKey)
if err != nil {
return fmt.Errorf("[STAGED_STREAM_SYNC] validateHashesExist: failed to get hash for block %d: %w", blockNum, err)
}
if len(hash) == 0 {
return fmt.Errorf("[STAGED_STREAM_SYNC] validateHashesExist: hash not found for block %d", blockNum)
}
}

if err := tx.Commit(); err != nil {
return err
if useInternalTx {
if err = tx.Commit(); err != nil {
return err
}
}

return nil
}

Expand All @@ -548,14 +569,6 @@ func (bh *StageBlockHashes) Revert(ctx context.Context, firstCycle bool, u *Reve
return err
}

// clean cache db as well
if err := bh.clearCache(); err != nil {
bh.configs.logger.Error().
Err(err).
Msgf("[STAGED_STREAM_SYNC] clear block hashes cache failed")
return err
}

// save progress
currentHead := bh.configs.bc.CurrentBlock().NumberU64()
if err = s.Update(tx, currentHead); err != nil {
Expand Down
35 changes: 34 additions & 1 deletion api/service/synchronize/stagedstreamsync/stage_bodies.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,40 @@ func (b *StageBodies) Exec(ctx context.Context, firstCycle bool, invalidBlockRev
return errV
}

if currProgress <= currentHead {
// if currProgress is 0, reset to currentHead
if currProgress == 0 {
currProgress = currentHead
// update progress in db
if err := s.Update(tx, currProgress); err != nil {
return err
}
}

// if currProgress is not equal to currentHead, clean all block DBs
// because it means stage was interrupted and we need to start from scratch
// this is to prevent the case where the block bodies are not saved in the cache databases or are corrupted
// we can't validate the block bodies from currentHead+1 to currProgress because the block bodies are per stream and
// download details are not available
if currProgress != currentHead {
b.configs.logger.Info().
Uint64("currProgress", currProgress).
Uint64("currentHead", currentHead).
Msg("[STAGED_STREAM_SYNC] block bodies validation failed, clearing all block DBs and resetting progress to currentHead")
if err := b.cleanAllBlockDBs(ctx); err != nil {
b.configs.logger.Error().
Err(err).
Msg("[STAGED_STREAM_SYNC] clear all block DBs failed")
return err
}
currProgress = currentHead
// update progress in db
if err := s.Update(tx, currProgress); err != nil {
return err
}
}

// currProgress is already equal to currentHead
// so if it's already caught up to targetHeight, it must skip the download loop
if currProgress >= targetHeight {
return nil
}
Expand Down Expand Up @@ -787,6 +814,9 @@ func (b *StageBodies) Revert(ctx context.Context, firstCycle bool, u *RevertStat

//clean all blocks DBs
if err := b.cleanAllBlockDBs(ctx); err != nil {
b.configs.logger.Error().
Err(err).
Msgf("[STAGED_STREAM_SYNC] StageBodies Revert: clean all block DBs after revert failed")
return err
}

Expand Down Expand Up @@ -827,6 +857,9 @@ func (b *StageBodies) Revert(ctx context.Context, firstCycle bool, u *RevertStat
func (b *StageBodies) CleanUp(ctx context.Context, firstCycle bool, p *CleanUpState, tx kv.RwTx) (err error) {
//clean all blocks DBs
if err := b.cleanAllBlockDBs(ctx); err != nil {
b.configs.logger.Error().
Err(err).
Msgf("[STAGED_STREAM_SYNC] StageBodies CleanUp: clean all block DBs after cleanup failed")
return err
}

Expand Down
6 changes: 6 additions & 0 deletions api/service/synchronize/stagedstreamsync/stage_states.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ func (stg *StageStates) Exec(ctx context.Context, firstCycle bool, invalidBlockR
if blk := stg.configs.bc.GetBlock(block.Hash(), block.NumberU64()); blk != nil {
if blk.NumberU64() == block.NumberU64() && blk.Hash() == block.Hash() {
stg.configs.bc.CurrentHeader().SetNumber(block.Number())
gbm.MarkBlockCompleted(i)
if invalidBlockRevert {
if s.state.invalidBlock.Number == i {
s.state.invalidBlock.resolve()
}
}
continue
}
}
Expand Down