diff --git a/api/service/synchronize/stagedstreamsync/default_stages.go b/api/service/synchronize/stagedstreamsync/default_stages.go index f15f9a1aec..3b0913e293 100644 --- a/api/service/synchronize/stagedstreamsync/default_stages.go +++ b/api/service/synchronize/stagedstreamsync/default_stages.go @@ -189,3 +189,29 @@ func DefaultStages(ctx context.Context, }, } } + +func EpochStages(ctx context.Context, + seCfg StageEpochCfg, + finishCfg StageFinishCfg, +) []*Stage { + + handlerStageEpochSync := NewStageEpoch(seCfg) + handlerStageFinish := NewStageFinish(finishCfg) + + return []*Stage{ + { + ID: SyncEpoch, + Description: "Sync only Last Block of Epoch", + Handler: handlerStageEpochSync, + RangeMode: OnlyShortRange, + ChainExecutionMode: OnlyEpochChain, + }, + { + ID: Finish, + Description: "Finalize Changes", + Handler: handlerStageFinish, + RangeMode: LongRangeAndShortRange, + ChainExecutionMode: AllChains, + }, + } +} diff --git a/api/service/synchronize/stagedstreamsync/downloader.go b/api/service/synchronize/stagedstreamsync/downloader.go index 21f0e70f33..aba23d61d5 100644 --- a/api/service/synchronize/stagedstreamsync/downloader.go +++ b/api/service/synchronize/stagedstreamsync/downloader.go @@ -53,6 +53,7 @@ func NewDownloader(host p2p.Host, setNodeSyncStatus func(bool)) *Downloader { config.fixValues() + isEpochChain := !isBeaconNode && bc.ShardID() == shard.BeaconChainShardID protoCfg := protocolConfig(host, bc, nodeConfig, isBeaconNode, config) sp := streamSyncProtocol.NewProtocol(*protoCfg) @@ -80,7 +81,13 @@ func NewDownloader(host p2p.Host, ctx, cancel := context.WithCancel(context.Background()) // create an instance of staged sync for the downloader - stagedSyncInstance, err := CreateStagedSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger, setNodeSyncStatus) + var stagedSyncInstance *StagedStreamSync + var err error + if isEpochChain { + stagedSyncInstance, err = CreateStagedEpochSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger, setNodeSyncStatus) + } else { + stagedSyncInstance, err = CreateStagedSync(ctx, bc, nodeConfig, consensus, dbDir, sp, config, isBeaconNode, logger, setNodeSyncStatus) + } if err != nil { cancel() return nil diff --git a/api/service/synchronize/stagedstreamsync/staged_stream_sync.go b/api/service/synchronize/stagedstreamsync/staged_stream_sync.go index 7e1af3e0f3..48905cb114 100644 --- a/api/service/synchronize/stagedstreamsync/staged_stream_sync.go +++ b/api/service/synchronize/stagedstreamsync/staged_stream_sync.go @@ -334,36 +334,41 @@ func New( setNodeSyncStatus func(bool), ) *StagedStreamSync { - forwardStages := make([]*Stage, len(StagesForwardOrder)) - for i, stageIndex := range StagesForwardOrder { + var forwardStages []*Stage + for _, stageID := range StagesForwardOrder { for _, s := range stagesList { - if s.ID == stageIndex { - forwardStages[i] = s + if s.ID == stageID { + forwardStages = append(forwardStages, s) break } } } - revertStages := make([]*Stage, len(StagesRevertOrder)) - for i, stageIndex := range StagesRevertOrder { + var revertStages []*Stage + for _, stageID := range StagesRevertOrder { for _, s := range stagesList { - if s.ID == stageIndex { - revertStages[i] = s + if s.ID == stageID { + revertStages = append(revertStages, s) break } } } - pruneStages := make([]*Stage, len(StagesCleanUpOrder)) - for i, stageIndex := range StagesCleanUpOrder { + var pruneStages []*Stage + for _, stageID := range StagesCleanUpOrder { for _, s := range stagesList { - if s.ID == stageIndex { - pruneStages[i] = s + if s.ID == stageID { + pruneStages = append(pruneStages, s) break } } } + // Validate that we have at least one stage + if len(forwardStages) == 0 { + logger.Error().Msg(WrapStagedSyncMsg("no valid stages found - this will cause sync failures")) + } + logPrefixes := make([]string, len(stagesList)) for i := range stagesList { logPrefixes[i] = fmt.Sprintf("%d/%d %s", i+1, len(stagesList), stagesList[i].ID) @@ -467,6 +472,14 @@ func (sss *StagedStreamSync) Run(ctx context.Context, db kv.RwDB, tx kv.RwTx, fi stage := sss.stages[sss.currentStage] + if stage == nil { + sss.logger.Error(). + Uint("currentStage", sss.currentStage). + Int("totalStages", len(sss.stages)). + Msg(WrapStagedSyncMsg("stage is nil, skipping to next stage")) + return fmt.Errorf("stage is nil") + } + if stage.Disabled { sss.logger.Trace(). Msg(WrapStagedSyncMsg(fmt.Sprintf("%s disabled. %s", stage.ID, stage.DisabledDescription))) diff --git a/api/service/synchronize/stagedstreamsync/syncing.go b/api/service/synchronize/stagedstreamsync/syncing.go index e72c243645..4790699fb7 100644 --- a/api/service/synchronize/stagedstreamsync/syncing.go +++ b/api/service/synchronize/stagedstreamsync/syncing.go @@ -151,7 +151,6 @@ func CreateStagedSync(ctx context.Context, Str("SyncMode", config.SyncMode.String()). Bool("serverOnly", config.ServerOnly). Int("minStreams", config.MinStreams). - Str("dbDir", dbDir). Msg(WrapStagedSyncMsg("staged stream sync created successfully")) return New( @@ -172,6 +171,101 @@ func CreateStagedSync(ctx context.Context, ), nil } +// CreateStagedEpochSync creates an instance of staged sync for epoch chain +func CreateStagedEpochSync(ctx context.Context, + bc core.BlockChain, + nodeConfig *nodeconfig.ConfigType, + consensus *consensus.Consensus, + dbDir string, + protocol syncProtocol, + config Config, + isBeaconNode bool, + logger zerolog.Logger, + setNodeSyncStatus func(bool), +) (*StagedStreamSync, error) { + + logger.Info(). + Uint32("shard", bc.ShardID()). + Bool("isBeaconNode", isBeaconNode). + Bool("memdb", config.UseMemDB). + Str("dbDir", dbDir). + Bool("serverOnly", config.ServerOnly). + Int("minStreams", config.MinStreams). + Msg(WrapStagedSyncMsg("creating staged epoch sync")) + + isExplorer := nodeConfig.Role() == nodeconfig.ExplorerNode + isValidator := nodeConfig.Role() == nodeconfig.Validator + isBeaconShard := true + isEpochChain := true + isBeaconValidator := false + joinConsensus := false + + var mainDB kv.RwDB + if config.UseMemDB { + mdbPath := getEpochDbPath(dbDir) + logger.Info(). + Str("path", mdbPath). + Msg(WrapStagedSyncMsg("creating epoch main db in memory")) + mainDB = mdbx.NewMDBX(log.New()).InMem(mdbPath).MustOpen() + } else { + mdbPath := getEpochDbPath(dbDir) + logger.Info(). + Str("path", mdbPath). + Msg(WrapStagedSyncMsg("creating epoch main db in disk")) + mainDB = mdbx.NewMDBX(log.New()).Path(mdbPath).MustOpen() + } + + // Initialize database buckets for epoch sync + // Epoch sync doesn't need sub-databases, so we pass an empty slice + if errInitDB := initDB(ctx, mainDB, []kv.RwDB{}); errInitDB != nil { + logger.Error().Err(errInitDB).Msg("create staged epoch sync instance failed") + return nil, errInitDB + } + + stageSyncEpochCfg := NewStageEpochCfg(bc, mainDB, logger) + stageFinishCfg := NewStageFinishCfg(mainDB, logger) + + // init stages order based on sync mode + initStagesOrder(config.SyncMode) + + epochStages := EpochStages(ctx, + stageSyncEpochCfg, + stageFinishCfg, + ) + + logger.Info(). + Uint32("shard", bc.ShardID()). + Bool("isEpochChain", isEpochChain). + Bool("isExplorer", isExplorer). + Bool("isValidator", isValidator). + Bool("isBeaconShard", isBeaconShard). + Bool("isBeaconValidator", isBeaconValidator). + Bool("joinConsensus", joinConsensus). + Bool("memdb", config.UseMemDB). + Str("dbDir", dbDir). + Str("SyncMode", config.SyncMode.String()). + Bool("serverOnly", config.ServerOnly). + Int("minStreams", config.MinStreams). + Msg(WrapStagedSyncMsg("staged stream epoch sync created successfully")) + + return New( + bc, + consensus, + mainDB, + epochStages, + protocol, + isEpochChain, + isBeaconShard, + isBeaconValidator, + isExplorer, + isValidator, + joinConsensus, + config, + logger, + setNodeSyncStatus, + ), nil +} + // initDB inits the sync loop main database and create buckets func initDB(ctx context.Context, mainDB kv.RwDB, dbs []kv.RwDB) error { @@ -236,6 +330,11 @@ func getBlockDbPath(shardID uint32, beacon bool, workerID int, dbDir string) str } } +// getEpochDbPath returns the path of the cache database which stores epoch blocks +func getEpochDbPath(dbDir string) string { + return filepath.Join(dbDir, "cache/epoch_db_main") +} + func (s *StagedStreamSync) Debug(source string, msg interface{}) { // only log the msg in debug mode if !s.config.DebugMode { diff --git a/core/blockchain_impl.go b/core/blockchain_impl.go index cd9a0b3068..814319c074 100644 --- a/core/blockchain_impl.go +++ b/core/blockchain_impl.go @@ -90,6 +90,10 @@ var ( blockValidationTimer = metrics.NewRegisteredTimer("chain/validation", nil) blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil) blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil) + + // CrossLinkPendingQueueGauge is used to monitor the current size of pending crosslink queue + CrossLinkPendingQueueGauge = metrics.NewRegisteredGauge("chain/crosslink/pending_queue_size", nil) + // ErrCrosslinkNotFound is the error when no crosslink found ErrCrosslinkNotFound = errors.New("crosslink not found") // ErrZeroBytes is the error when it reads empty crosslink @@ -2558,7 +2562,12 @@ func (bc *BlockChainImpl) ReadPendingCrossLinks() ([]types.CrossLink, error) { bc.pendingCrossLinksMutex.Lock() defer bc.pendingCrossLinksMutex.Unlock() - return bc.readPendingCrossLinks() + cls, err := bc.readPendingCrossLinks() + if err == nil { + // Update pending queue gauge with current size + bc.updatePendingCrossLinkQueueGauge(len(cls)) + } + return cls, err } func (bc *BlockChainImpl) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, error) { @@ -2568,10 +2577,18 @@ func (bc *BlockChainImpl) AddPendingCrossLinks(pendingCLs []types.CrossLink) (in cls, err := bc.readPendingCrossLinks() if err != nil || len(cls) == 0 { err := bc.CachePendingCrossLinks(pendingCLs) + if err == nil { + // Update pending queue gauge with new size + bc.updatePendingCrossLinkQueueGauge(len(pendingCLs)) + } return len(pendingCLs), err } cls = append(cls, pendingCLs...) err = bc.CachePendingCrossLinks(cls) + if err == nil { + // Update pending queue gauge with new size + bc.updatePendingCrossLinkQueueGauge(len(cls)) + } return len(cls), err } @@ -2603,9 +2620,18 @@ func (bc *BlockChainImpl) DeleteFromPendingCrossLinks(crossLinks []types.CrossLi pendingCLs = append(pendingCLs, cl) } err = bc.CachePendingCrossLinks(pendingCLs) + if err == nil { + // Update pending queue gauge with new size after deletion + bc.updatePendingCrossLinkQueueGauge(len(pendingCLs)) + } return len(pendingCLs), err } +// updatePendingCrossLinkQueueGauge updates the pending crosslink queue size gauge +func (bc *BlockChainImpl) updatePendingCrossLinkQueueGauge(size int) { + CrossLinkPendingQueueGauge.Update(int64(size)) +} + func (bc *BlockChainImpl) IsSameLeaderAsPreviousBlock(block *types.Block) bool { if IsEpochBlock(block) { return false diff --git a/node/harmony/metrics.go b/node/harmony/metrics.go index f74fbad661..ad81df8076 100644 --- a/node/harmony/metrics.go +++ b/node/harmony/metrics.go @@ -70,6 +70,16 @@ var ( }, ) + // CrossLinkPendingQueueGauge is used to monitor the current size of pending crosslink queue + CrossLinkPendingQueueGauge = prometheus.NewGauge( + prometheus.GaugeOpts{ + Namespace: "hmy", + Subsystem: "p2p", + Name: "crosslink_pending_queue_size", + Help: "current number of crosslinks in pending queue", + }, + ) + onceMetrics sync.Once ) @@ -81,6 +91,7 @@ func initMetrics() { nodeConsensusMessageCounterVec, nodeNodeMessageCounterVec, nodeCrossLinkMessageCounterVec, + CrossLinkPendingQueueGauge, ) }) } diff --git a/node/harmony/node_cross_link.go b/node/harmony/node_cross_link.go index 8330d71d5c..c772a9309d 100644 --- a/node/harmony/node_cross_link.go +++ b/node/harmony/node_cross_link.go @@ -158,15 +158,15 @@ func (node *Node) processCrossLinkHeartbeatMessage(msgPayload []byte) error { return errors.New("epoch chain current block not available") } - // Check if epoch chain is synced to at least the heartbeat epoch - currentEpoch := epochCurrentBlock.Epoch() - currentEpochU64 := currentEpoch.Uint64() - if currentEpochU64 < hb.Epoch { + epochChainEpoch := epochCurrentBlock.Epoch().Uint64() + + // epoch chain should be at least at the previous epoch + if epochChainEpoch < hb.Epoch-1 { utils.Logger().Warn(). - Uint64("currentEpoch", currentEpochU64). + Uint64("epochChainEpoch", epochChainEpoch). Uint64("heartbeatEpoch", hb.Epoch). Msg("[ProcessCrossLinkHeartbeatMessage] epoch chain not synced to heartbeat epoch, ignoring heartbeat") - return errors.Errorf("epoch chain not synced to heartbeat epoch: current=%d, heartbeat=%d", currentEpochU64, hb.Epoch) + return errors.Errorf("epoch chain not synced to heartbeat epoch: current=%d, heartbeat=%d", epochChainEpoch, hb.Epoch) } // Outdated signal. @@ -199,7 +199,12 @@ func (node *Node) processCrossLinkHeartbeatMessage(msgPayload []byte) error { state, err := epochChain.ReadShardState(cur.Epoch()) if err != nil { - return errors.WithMessagef(err, "cannot read shard state for epoch %d", cur.Epoch()) + utils.Logger().Warn(). + Err(err). + Uint64("heartbeatEpoch", hb.Epoch). + Uint64("epochChainEpoch", epochChainEpoch). + Msg("[ProcessCrossLinkHeartbeatMessage] cannot read shard state for heartbeat epoch from beacon chain") + return errors.WithMessagef(err, "cannot read shard state for epoch %d from beacon chain", hb.Epoch) } committee, err := state.FindCommitteeByID(shard.BeaconChainShardID) if err != nil { @@ -279,7 +284,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { // Check if cross-link already exists in pending queue if _, exists := existingCLs[cl.Hash()]; exists { - nodeCrossLinkMessageCounterVec.With(prometheus.Labels{"type": "duplicate_crosslink"}).Inc() + nodeCrossLinkMessageCounterVec.With(prometheus.Labels{"type": "duplicate_crosslink_pending_queue"}).Inc() utils.Logger().Debug(). Str("crossLinkHash", cl.Hash().Hex()). Uint64("beaconEpoch", node.Blockchain().CurrentHeader().Epoch().Uint64()). @@ -293,7 +298,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { // Check if cross-link already exists in blockchain exist, err := node.Blockchain().ReadCrossLink(cl.ShardID(), cl.Number().Uint64()) if err == nil && exist != nil { - nodeCrossLinkMessageCounterVec.With(prometheus.Labels{"type": "duplicate_crosslink"}).Inc() + nodeCrossLinkMessageCounterVec.With(prometheus.Labels{"type": "duplicate_crosslink_already_processed"}).Inc() utils.Logger().Debug(). Str("crossLinkHash", cl.Hash().Hex()). Uint64("beaconEpoch", node.Blockchain().CurrentHeader().Epoch().Uint64()). @@ -310,7 +315,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { // Allow processing cross-links from current epoch and earlier // Cross-links from future epochs should not exist and may be malicious or maybe it is not fully synced - if crossLinkEpoch > localEpoch { + if crossLinkEpoch > localEpoch+1 { utils.Logger().Debug(). Str("crossLinkHash", cl.Hash().Hex()). Uint64("crossLinkEpoch", crossLinkEpoch).