diff --git a/CHANGELOG.md b/CHANGELOG.md index ec8fdf4aff..0a91c44429 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [#1748](https://github.com/crypto-org-chain/cronos/pull/1748) Query with GetCFWithTS to compare both timestamp and key to avoid run fixdata multiple times. * (versiondb) [#1751](https://github.com/crypto-org-chain/cronos/pull/1751) Add missing Destroy for read options to properly hold and release options reference. * (versiondb) [#1758](https://github.com/crypto-org-chain/cronos/pull/1758) Avoid ReadOptions mutated by reference release in iterator. +* [#1759](https://github.com/crypto-org-chain/cronos/pull/1759) Fix version mismatch happen occasionally. ### Improvements diff --git a/app/app.go b/app/app.go index 04188c7703..f375a369ea 100644 --- a/app/app.go +++ b/app/app.go @@ -959,9 +959,27 @@ func New( panic(err) } + // wire up the versiondb's `StreamingService` and `MultiStore`. + if cast.ToBool(appOpts.Get("versiondb.enable")) { + var err error + app.qms, err = app.setupVersionDB(homePath, keys, tkeys, memKeys, okeys) + if err != nil { + panic(err) + } + } + + var qmsVersion int64 + if app.qms != nil { + qmsVersion = app.qms.LatestVersion() + } + // RegisterUpgradeHandlers is used for registering any on-chain upgrades. // Make sure it's called after `app.mm` and `app.configurator` are set. - app.RegisterUpgradeHandlers(app.appCodec) + storeLoaderOverritten := app.RegisterUpgradeHandlers(app.appCodec, qmsVersion) + if !storeLoaderOverritten { + // Register the default store loader + app.SetStoreLoader(MaxVersionStoreLoader(qmsVersion)) + } // add test gRPC service for testing gRPC queries in isolation // testdata.RegisterQueryServer(app.GRPCQueryRouter(), testdata.QueryImpl{}) @@ -992,15 +1010,6 @@ func New( app.MountMemoryStores(memKeys) app.MountObjectStores(okeys) - // wire up the versiondb's `StreamingService` and `MultiStore`. - if cast.ToBool(appOpts.Get("versiondb.enable")) { - var err error - app.qms, err = app.setupVersionDB(homePath, keys, tkeys, memKeys, okeys) - if err != nil { - panic(err) - } - } - // initialize BaseApp app.SetInitChainer(app.InitChainer) app.SetPreBlocker(app.PreBlocker) @@ -1045,12 +1054,13 @@ func New( tmos.Exit(err.Error()) } - if app.qms != nil { - v1 := app.qms.LatestVersion() - v2 := app.LastBlockHeight() - if v1 > 0 && v1 < v2 { + if qmsVersion > 0 { + // it should not happens since we constraint the loaded iavl version to not exceed the versiondb version, + // still keep the check for safety. + iavlVersion := app.LastBlockHeight() + if qmsVersion < iavlVersion { // try to prevent gap being created in versiondb - tmos.Exit(fmt.Sprintf("versiondb version %d lag behind iavl version %d", v1, v2)) + tmos.Exit(fmt.Sprintf("versiondb version %d lag behind iavl version %d", qmsVersion, iavlVersion)) } } diff --git a/app/storeloader.go b/app/storeloader.go new file mode 100644 index 0000000000..4bf42cab45 --- /dev/null +++ b/app/storeloader.go @@ -0,0 +1,37 @@ +package app + +import ( + storetypes "cosmossdk.io/store/types" + upgradetypes "cosmossdk.io/x/upgrade/types" + "github.com/cosmos/cosmos-sdk/baseapp" +) + +// MaxVersionStoreLoader will be used when there's versiondb to cap the loaded iavl version +func MaxVersionStoreLoader(version int64) baseapp.StoreLoader { + if version == 0 { + return baseapp.DefaultStoreLoader + } + + return func(ms storetypes.CommitMultiStore) error { + return ms.LoadVersion(version) + } +} + +// MaxVersionUpgradeStoreLoader is used to prepare baseapp with a fixed StoreLoader +func MaxVersionUpgradeStoreLoader(version int64, upgradeHeight int64, storeUpgrades *storetypes.StoreUpgrades) baseapp.StoreLoader { + if version == 0 { + return upgradetypes.UpgradeStoreLoader(upgradeHeight, storeUpgrades) + } + + return func(ms storetypes.CommitMultiStore) error { + if upgradeHeight == ms.LastCommitID().Version+1 { + // Check if the current commit version and upgrade height matches + if len(storeUpgrades.Renamed) > 0 || len(storeUpgrades.Deleted) > 0 || len(storeUpgrades.Added) > 0 { + return ms.LoadLatestVersionAndUpgrade(storeUpgrades) + } + } + + // Otherwise load default store loader + return MaxVersionStoreLoader(version)(ms) + } +} diff --git a/app/upgrades.go b/app/upgrades.go index 61166895ee..532b1b7e71 100644 --- a/app/upgrades.go +++ b/app/upgrades.go @@ -17,7 +17,8 @@ import ( evmtypes "github.com/evmos/ethermint/x/evm/types" ) -func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec) { +// RegisterUpgradeHandlers returns if store loader is overridden +func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec, maxVersion int64) bool { planName := "v1.4" app.UpgradeKeeper.SetUpgradeHandler(planName, func(ctx context.Context, plan upgradetypes.Plan, fromVM module.VersionMap) (module.VersionMap, error) { m, err := app.ModuleManager.RunMigrations(ctx, app.configurator, fromVM) @@ -54,14 +55,18 @@ func (app *App) RegisterUpgradeHandlers(cdc codec.BinaryCodec) { } if !app.UpgradeKeeper.IsSkipHeight(upgradeInfo.Height) { if upgradeInfo.Name == planName { - app.SetStoreLoader(upgradetypes.UpgradeStoreLoader(upgradeInfo.Height, &storetypes.StoreUpgrades{ + app.SetStoreLoader(MaxVersionUpgradeStoreLoader(maxVersion, upgradeInfo.Height, &storetypes.StoreUpgrades{ Added: []string{ icahosttypes.StoreKey, }, Deleted: []string{"icaauth"}, })) + + return true } } + + return false } func UpdateExpeditedParams(ctx context.Context, gov govkeeper.Keeper) error { diff --git a/integration_tests/shell.nix b/integration_tests/shell.nix index 0eac66d989..3f028d4299 100644 --- a/integration_tests/shell.nix +++ b/integration_tests/shell.nix @@ -17,5 +17,6 @@ pkgs.mkShell { shellHook = '' mkdir ./coverage export GOCOVERDIR=./coverage + export TMPDIR=/tmp ''; } diff --git a/integration_tests/test_versiondb.py b/integration_tests/test_versiondb.py index eece5eebf7..a6b19c2ab3 100644 --- a/integration_tests/test_versiondb.py +++ b/integration_tests/test_versiondb.py @@ -5,7 +5,7 @@ from pystarport import ports from .network import Cronos -from .utils import ADDRS, send_transaction, wait_for_port +from .utils import ADDRS, send_transaction, w3_wait_for_new_blocks, wait_for_port def test_versiondb_migration(cronos: Cronos): @@ -37,6 +37,9 @@ def test_versiondb_migration(cronos: Cronos): balance1 = w3.eth.get_balance(community) block1 = w3.eth.block_number + # wait for a few blocks + w3_wait_for_new_blocks(w3, 5) + # stop the network first print("stop all nodes") print(cronos.supervisorctl("stop", "all")) @@ -45,7 +48,10 @@ def test_versiondb_migration(cronos: Cronos): changeset_dir = tempfile.mkdtemp(dir=cronos.base_dir) print("dump to:", changeset_dir) - print(cli1.changeset_dump(changeset_dir)) + + # only restore to an intermidiate version to test version mismatch behavior + print(cli1.changeset_dump(changeset_dir, end_version=block1 + 1)) + snapshot_dir = tempfile.mkdtemp(dir=cronos.base_dir) print("verify and save to snapshot:", snapshot_dir) _, commit_info = cli0.changeset_verify(changeset_dir, save_snapshot=snapshot_dir) diff --git a/memiavl/db.go b/memiavl/db.go index 02566c8a52..8ef6e7cc46 100644 --- a/memiavl/db.go +++ b/memiavl/db.go @@ -80,6 +80,9 @@ type DB struct { mtx sync.Mutex // worker goroutine IdleTimeout = 5s snapshotWriterPool *pond.WorkerPool + + // reusable write batch + wbatch wal.Batch } type Options struct { @@ -440,8 +443,13 @@ func (db *DB) checkBackgroundSnapshotRewrite() error { db.snapshotRewriteCancel = nil if result.mtree == nil { - // background snapshot rewrite failed - return fmt.Errorf("background snapshot rewriting failed: %w", result.err) + if result.err != nil { + // background snapshot rewrite failed + return fmt.Errorf("background snapshot rewriting failed: %w", result.err) + } + + // background snapshot rewrite don't success, but no error to propagate, ignore it. + return nil } // wait for potential pending wal writings to finish, to make sure we catch up to latest state. @@ -556,11 +564,17 @@ func (db *DB) Commit() (int64, error) { // async wal writing db.walChan <- &entry } else { - bz, err := entry.data.Marshal() + lastIndex, err := db.wal.LastIndex() if err != nil { return 0, err } - if err := db.wal.Write(entry.index, bz); err != nil { + + db.wbatch.Clear() + if err := writeEntry(&db.wbatch, db.logger, lastIndex, &entry); err != nil { + return 0, err + } + + if err := db.wal.WriteBatch(&db.wbatch); err != nil { return 0, err } } @@ -591,13 +605,17 @@ func (db *DB) initAsyncCommit() { break } + lastIndex, err := db.wal.LastIndex() + if err != nil { + walQuit <- err + return + } + for _, entry := range entries { - bz, err := entry.data.Marshal() - if err != nil { + if err := writeEntry(&batch, db.logger, lastIndex, entry); err != nil { walQuit <- err return } - batch.Write(entry.index, bz) } if err := db.wal.WriteBatch(&batch); err != nil { @@ -749,7 +767,8 @@ func (db *DB) rewriteSnapshotBackground() error { cloned.logger.Info("start rewriting snapshot", "version", cloned.Version()) if err := cloned.RewriteSnapshotWithContext(ctx); err != nil { - ch <- snapshotResult{err: err} + // write error log but don't stop the client, it could happen when load an old version. + cloned.logger.Error("failed to rewrite snapshot", "err", err) return } cloned.logger.Info("finished rewriting snapshot", "version", cloned.Version()) @@ -1093,3 +1112,17 @@ func channelBatchRecv[T any](ch <-chan *T) []*T { return result } + +func writeEntry(batch *wal.Batch, logger Logger, lastIndex uint64, entry *walEntry) error { + bz, err := entry.data.Marshal() + if err != nil { + return err + } + + if entry.index <= lastIndex { + logger.Error("commit old version idempotently", "lastIndex", lastIndex, "version", entry.index) + } else { + batch.Write(entry.index, bz) + } + return nil +} diff --git a/memiavl/db_test.go b/memiavl/db_test.go index a59f8c3886..ea7a39a86f 100644 --- a/memiavl/db_test.go +++ b/memiavl/db_test.go @@ -3,6 +3,7 @@ package memiavl import ( "encoding/hex" "errors" + fmt "fmt" "os" "path/filepath" "runtime/debug" @@ -497,3 +498,73 @@ func TestRepeatedApplyChangeSet(t *testing.T) { }) require.NoError(t, err) } + +func TestIdempotentWrite(t *testing.T) { + for _, asyncCommit := range []bool{false, true} { + t.Run(fmt.Sprintf("asyncCommit=%v", asyncCommit), func(t *testing.T) { + testIdempotentWrite(t, asyncCommit) + }) + } +} + +func testIdempotentWrite(t *testing.T, asyncCommit bool) { + dir := t.TempDir() + + asyncCommitBuffer := -1 + if asyncCommit { + asyncCommitBuffer = 10 + } + + db, err := Load(dir, Options{ + CreateIfMissing: true, + InitialStores: []string{"test1", "test2"}, + AsyncCommitBuffer: asyncCommitBuffer, + }) + require.NoError(t, err) + + // generate some data into db + var changes [][]*NamedChangeSet + for i := 0; i < 10; i++ { + cs := []*NamedChangeSet{ + { + Name: "test1", + Changeset: ChangeSet{Pairs: mockKVPairs("hello", fmt.Sprintf("world%d", i))}, + }, + { + Name: "test2", + Changeset: ChangeSet{Pairs: mockKVPairs("hello", fmt.Sprintf("world%d", i))}, + }, + } + changes = append(changes, cs) + } + + for _, cs := range changes { + require.NoError(t, db.ApplyChangeSets(cs)) + _, err := db.Commit() + require.NoError(t, err) + } + + commitInfo := *db.LastCommitInfo() + require.NoError(t, db.Close()) + + // reload db from disk at an intermediate version + db, err = Load(dir, Options{TargetVersion: 5}) + require.NoError(t, err) + + // replay some random writes to reach same version + for i := 0; i < 5; i++ { + require.NoError(t, db.ApplyChangeSets(changes[i+5])) + _, err := db.Commit() + require.NoError(t, err) + } + + // it should reach same result + require.Equal(t, commitInfo, *db.LastCommitInfo()) + + require.NoError(t, db.Close()) + + // reload db again, it should reach same result + db, err = Load(dir, Options{}) + require.NoError(t, err) + require.Equal(t, commitInfo, *db.LastCommitInfo()) +}