-
-
Notifications
You must be signed in to change notification settings - Fork 446
feat: regen to consume state cache reload api #6456
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,29 +10,34 @@ import { | |
| stateTransition, | ||
| } from "@lodestar/state-transition"; | ||
| import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; | ||
| import {sleep} from "@lodestar/utils"; | ||
| import {Logger, sleep} from "@lodestar/utils"; | ||
| import {SLOTS_PER_EPOCH} from "@lodestar/params"; | ||
| import {ChainForkConfig} from "@lodestar/config"; | ||
| import {Metrics} from "../../metrics/index.js"; | ||
| import {IBeaconDb} from "../../db/index.js"; | ||
| import {CheckpointStateCache, StateContextCache} from "../stateCache/index.js"; | ||
| import {getCheckpointFromState} from "../blocks/utils/checkpoint.js"; | ||
| import {ChainEvent, ChainEventEmitter} from "../emitter.js"; | ||
| import {CheckpointStateCache, BlockStateCache} from "../stateCache/types.js"; | ||
| import {IStateRegeneratorInternal, RegenCaller, StateCloneOpts} from "./interface.js"; | ||
| import {RegenError, RegenErrorCode} from "./errors.js"; | ||
|
|
||
| export type RegenModules = { | ||
| db: IBeaconDb; | ||
| forkChoice: IForkChoice; | ||
| stateCache: StateContextCache; | ||
| stateCache: BlockStateCache; | ||
| checkpointStateCache: CheckpointStateCache; | ||
| config: ChainForkConfig; | ||
| emitter: ChainEventEmitter; | ||
| logger: Logger; | ||
| metrics: Metrics | null; | ||
| }; | ||
|
|
||
| /** | ||
| * Regenerates states that have already been processed by the fork choice | ||
| * Since Feb 2024, we support reloading checkpoint state from disk via shouldReload flag. Due to its performance impact | ||
| * this flag is only set to true in this case: | ||
| * - getPreState: this is for block processing, it's important to reload state in unfinality time | ||
| * - updateHeadState: rarely happen, but it's important to make sure we always can regen head state | ||
| */ | ||
| export class StateRegenerator implements IStateRegeneratorInternal { | ||
| constructor(private readonly modules: RegenModules) {} | ||
|
|
@@ -41,6 +46,7 @@ export class StateRegenerator implements IStateRegeneratorInternal { | |
| * Get the state to run with `block`. May be: | ||
| * - If parent is in same epoch -> Exact state at `block.parentRoot` | ||
| * - If parent is in prev epoch -> State after `block.parentRoot` dialed forward through epoch transition | ||
| * - reload state if needed in this flow | ||
| */ | ||
| async getPreState( | ||
| block: allForks.BeaconBlock, | ||
|
|
@@ -57,18 +63,19 @@ export class StateRegenerator implements IStateRegeneratorInternal { | |
|
|
||
| const parentEpoch = computeEpochAtSlot(parentBlock.slot); | ||
| const blockEpoch = computeEpochAtSlot(block.slot); | ||
| const shouldReload = true; | ||
|
|
||
| // This may save us at least one epoch transition. | ||
| // If the requested state crosses an epoch boundary | ||
| // then we may use the checkpoint state before the block | ||
| // We may have the checkpoint state with parent root inside the checkpoint state cache | ||
| // through gossip validation. | ||
| if (parentEpoch < blockEpoch) { | ||
| return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, rCaller); | ||
| return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, rCaller, shouldReload); | ||
| } | ||
|
|
||
| // Otherwise, get the state normally. | ||
| return this.getState(parentBlock.stateRoot, rCaller); | ||
| return this.getState(parentBlock.stateRoot, rCaller, shouldReload); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -77,20 +84,23 @@ export class StateRegenerator implements IStateRegeneratorInternal { | |
| async getCheckpointState( | ||
| cp: phase0.Checkpoint, | ||
| opts: StateCloneOpts, | ||
| rCaller: RegenCaller | ||
| rCaller: RegenCaller, | ||
| shouldReload = false | ||
|
||
| ): Promise<CachedBeaconStateAllForks> { | ||
| const checkpointStartSlot = computeStartSlotAtEpoch(cp.epoch); | ||
| return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, rCaller); | ||
| return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, rCaller, shouldReload); | ||
| } | ||
|
|
||
| /** | ||
| * Get state after block `blockRoot` dialed forward to `slot` | ||
| * - shouldReload should be used with care, as it will cause the state to be reloaded from disk | ||
|
||
| */ | ||
| async getBlockSlotState( | ||
| blockRoot: RootHex, | ||
| slot: Slot, | ||
| opts: StateCloneOpts, | ||
| rCaller: RegenCaller | ||
| rCaller: RegenCaller, | ||
twoeths marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| shouldReload = false | ||
|
||
| ): Promise<CachedBeaconStateAllForks> { | ||
| const block = this.modules.forkChoice.getBlockHex(blockRoot); | ||
| if (!block) { | ||
|
|
@@ -108,26 +118,31 @@ export class StateRegenerator implements IStateRegeneratorInternal { | |
| }); | ||
| } | ||
|
|
||
| const latestCheckpointStateCtx = this.modules.checkpointStateCache.getLatest(blockRoot, computeEpochAtSlot(slot)); | ||
| const {checkpointStateCache} = this.modules; | ||
| const getLatestApi = shouldReload | ||
| ? checkpointStateCache.getOrReloadLatest.bind(checkpointStateCache) | ||
| : checkpointStateCache.getLatest.bind(checkpointStateCache); | ||
| const latestCheckpointStateCtx = await getLatestApi(blockRoot, computeEpochAtSlot(slot)); | ||
twoeths marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| // If a checkpoint state exists with the given checkpoint root, it either is in requested epoch | ||
| // or needs to have empty slots processed until the requested epoch | ||
| if (latestCheckpointStateCtx) { | ||
| return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, opts); | ||
| return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, rCaller, opts); | ||
| } | ||
|
|
||
| // Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root | ||
| // regenerate that state, | ||
| // then process empty slots until the requested epoch | ||
| const blockStateCtx = await this.getState(block.stateRoot, rCaller); | ||
| return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, opts); | ||
| const blockStateCtx = await this.getState(block.stateRoot, rCaller, shouldReload); | ||
| return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, rCaller, opts); | ||
| } | ||
|
|
||
| /** | ||
| * Get state by exact root. If not in cache directly, requires finding the block that references the state from the | ||
| * forkchoice and replaying blocks to get to it. | ||
| * - shouldReload should be used with care, as it will cause the state to be reloaded from disk | ||
| */ | ||
| async getState(stateRoot: RootHex, _rCaller: RegenCaller): Promise<CachedBeaconStateAllForks> { | ||
| async getState(stateRoot: RootHex, _rCaller: RegenCaller, shouldReload = false): Promise<CachedBeaconStateAllForks> { | ||
| // Trivial case, state at stateRoot is already cached | ||
| const cachedStateCtx = this.modules.stateCache.get(stateRoot); | ||
| if (cachedStateCtx) { | ||
|
|
@@ -143,15 +158,17 @@ export class StateRegenerator implements IStateRegeneratorInternal { | |
| // gets reversed when replayed | ||
| const blocksToReplay = [block]; | ||
| let state: CachedBeaconStateAllForks | null = null; | ||
| for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.parentRoot)) { | ||
| const {checkpointStateCache} = this.modules; | ||
| const getLatestApi = shouldReload | ||
| ? checkpointStateCache.getOrReloadLatest.bind(checkpointStateCache) | ||
| : checkpointStateCache.getLatest.bind(checkpointStateCache); | ||
| // iterateAncestorBlocks only returns ancestor blocks, not the block itself | ||
| for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) { | ||
| state = this.modules.stateCache.get(b.stateRoot); | ||
| if (state) { | ||
| break; | ||
| } | ||
| state = this.modules.checkpointStateCache.getLatest( | ||
| b.blockRoot, | ||
| computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1) | ||
| ); | ||
| state = await getLatestApi(b.blockRoot, computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1)); | ||
twoeths marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if (state) { | ||
| break; | ||
| } | ||
|
|
@@ -172,6 +189,8 @@ export class StateRegenerator implements IStateRegeneratorInternal { | |
| }); | ||
| } | ||
|
|
||
| const replaySlots = blocksToReplay.map((b) => b.slot).join(","); | ||
| this.modules.logger.debug("Replaying blocks to get state", {stateRoot, replaySlots}); | ||
| for (const b of blocksToReplay.reverse()) { | ||
| const block = await this.modules.db.block.get(fromHexString(b.blockRoot)); | ||
| if (!block) { | ||
|
|
@@ -195,11 +214,23 @@ export class StateRegenerator implements IStateRegeneratorInternal { | |
| verifyProposer: false, | ||
| verifySignatures: false, | ||
| }, | ||
| null | ||
| this.modules.metrics | ||
| ); | ||
|
|
||
| // TODO: Persist states, note that regen could be triggered by old states. | ||
| // Should those take a place in the cache? | ||
| const stateRoot = toHexString(state.hashTreeRoot()); | ||
| if (b.stateRoot !== stateRoot) { | ||
| throw new RegenError({ | ||
| slot: b.slot, | ||
| code: RegenErrorCode.INVALID_STATE_ROOT, | ||
| actual: stateRoot, | ||
| expected: b.stateRoot, | ||
| }); | ||
| } | ||
|
|
||
| if (shouldReload) { | ||
twoeths marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| // also with shouldReload flag, we "reload" it to the state cache too | ||
| this.modules.stateCache.add(state); | ||
| } | ||
|
|
||
| // this avoids keeping our node busy processing blocks | ||
| await sleep(0); | ||
|
|
@@ -210,13 +241,14 @@ export class StateRegenerator implements IStateRegeneratorInternal { | |
| }); | ||
| } | ||
| } | ||
| this.modules.logger.debug("Replayed blocks to get state", {stateRoot, replaySlots}); | ||
|
|
||
| return state; | ||
| } | ||
|
|
||
| private findFirstStateBlock(stateRoot: RootHex): ProtoBlock { | ||
| for (const block of this.modules.forkChoice.forwarditerateAncestorBlocks()) { | ||
| if (block !== undefined) { | ||
| if (block.stateRoot === stateRoot) { | ||
twoeths marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return block; | ||
| } | ||
| } | ||
|
|
@@ -237,9 +269,10 @@ async function processSlotsByCheckpoint( | |
| modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter}, | ||
| preState: CachedBeaconStateAllForks, | ||
| slot: Slot, | ||
| rCaller: RegenCaller, | ||
| opts: StateCloneOpts | ||
| ): Promise<CachedBeaconStateAllForks> { | ||
| let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, opts); | ||
| let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, rCaller, opts); | ||
| if (postState.slot < slot) { | ||
| postState = processSlots(postState, slot, opts, modules.metrics); | ||
| } | ||
|
|
@@ -257,6 +290,7 @@ async function processSlotsToNearestCheckpoint( | |
| modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter}, | ||
| preState: CachedBeaconStateAllForks, | ||
| slot: Slot, | ||
| rCaller: RegenCaller, | ||
| opts: StateCloneOpts | ||
| ): Promise<CachedBeaconStateAllForks> { | ||
| const preSlot = preState.slot; | ||
|
|
@@ -272,12 +306,16 @@ async function processSlotsToNearestCheckpoint( | |
| ) { | ||
| // processSlots calls .clone() before mutating | ||
| postState = processSlots(postState, nextEpochSlot, opts, metrics); | ||
| modules.metrics?.epochTransitionByCaller.inc({caller: rCaller}); | ||
|
|
||
| // Cache state to preserve epoch transition work | ||
| // this is usually added when we prepare for next slot or validate gossip block | ||
| // then when we process the 1st block of epoch, we don't have to do state transition again | ||
| // This adds Previous Root Checkpoint State to the checkpoint state cache | ||
| // This may becomes the "official" checkpoint state if the 1st block of epoch is skipped | ||
| const checkpointState = postState; | ||
| const cp = getCheckpointFromState(checkpointState); | ||
| checkpointStateCache.add(cp, checkpointState); | ||
| emitter.emit(ChainEvent.checkpoint, cp, checkpointState); | ||
| emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why add the costly
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
in fact I will add more |
||
|
|
||
| // this avoids keeping our node busy processing blocks | ||
| await sleep(0); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.