Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 39 additions & 14 deletions pkg/fork/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"runtime"
"sort"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -69,6 +71,29 @@ var log = logging.Logger("fork")

var ErrExpensiveFork = errors.New("refusing explicit call due to state fork at epoch")

var (
MigrationMaxWorkerCount int
EnvMigrationMaxWorkerCount = "VENUS_MIGRATION_MAX_WORKER_COUNT"
)

func init() {
// the default calculation used for migration worker count
MigrationMaxWorkerCount = runtime.NumCPU()
// check if an alternative value was request by environment
if mwcs := os.Getenv(EnvMigrationMaxWorkerCount); mwcs != "" {
mwc, err := strconv.ParseInt(mwcs, 10, 32)
if err != nil {
log.Warnf("invalid value for %s (%s) defaulting to %d: %s", EnvMigrationMaxWorkerCount, mwcs, MigrationMaxWorkerCount, err)
return
}
// use value from environment
log.Infof("migration worker cound set from %s (%d)", EnvMigrationMaxWorkerCount, mwc)
MigrationMaxWorkerCount = int(mwc)
return
}
log.Infof("migration worker count: %d", MigrationMaxWorkerCount)
}

// MigrationCache can be used to cache information used by a migration. This is primarily useful to
// "pre-compute" some migration state ahead of time, and make it accessible in the migration itself.
type MigrationCache interface {
Expand Down Expand Up @@ -1603,7 +1628,7 @@ func terminateActor(ctx context.Context, tree *vmstate.State, addr address.Addre

func (c *ChainFork) UpgradeActorsV3(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand Down Expand Up @@ -1642,7 +1667,7 @@ func (c *ChainFork) UpgradeActorsV3(ctx context.Context, cache MigrationCache, r
func (c *ChainFork) PreUpgradeActorsV3(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
log.Info("PreUpgradeActorsV3 ......")
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1706,7 +1731,7 @@ func (c *ChainFork) upgradeActorsV3Common(

func (c *ChainFork) UpgradeActorsV4(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1728,7 +1753,7 @@ func (c *ChainFork) UpgradeActorsV4(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV4(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1792,7 +1817,7 @@ func (c *ChainFork) upgradeActorsV4Common(

func (c *ChainFork) UpgradeActorsV5(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1814,7 +1839,7 @@ func (c *ChainFork) UpgradeActorsV5(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV5(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1878,7 +1903,7 @@ func (c *ChainFork) upgradeActorsV5Common(

func (c *ChainFork) UpgradeActorsV6(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1900,7 +1925,7 @@ func (c *ChainFork) UpgradeActorsV6(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV6(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -1966,7 +1991,7 @@ func (c *ChainFork) upgradeActorsV6Common(

func (c *ChainFork) UpgradeActorsV7(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -1988,7 +2013,7 @@ func (c *ChainFork) UpgradeActorsV7(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV7(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -2063,7 +2088,7 @@ func (c *ChainFork) upgradeActorsV7Common(

func (c *ChainFork) UpgradeActorsV8(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -2087,7 +2112,7 @@ func (c *ChainFork) UpgradeActorsV8(ctx context.Context, cache MigrationCache, r

func (c *ChainFork) PreUpgradeActorsV8(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down Expand Up @@ -2174,7 +2199,7 @@ func (c *ChainFork) upgradeActorsV8Common(

func (c *ChainFork) UpgradeActorsV9(ctx context.Context, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// Use all the CPUs except 3.
workerCount := runtime.NumCPU() - 3
workerCount := MigrationMaxWorkerCount - 3
if workerCount <= 0 {
workerCount = 1
}
Expand All @@ -2201,7 +2226,7 @@ func (c *ChainFork) PreUpgradeActorsV9(ctx context.Context,
ts *types.TipSet,
) error {
// Use half the CPUs for pre-migration, but leave at least 3.
workerCount := runtime.NumCPU()
workerCount := MigrationMaxWorkerCount
if workerCount <= 4 {
workerCount = 1
} else {
Expand Down