Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Bug Fixes

* [#1679](https://github.com/crypto-org-chain/cronos/pull/1679) Include no trace detail on insufficient balance fix.
* [#1685](https://github.com/crypto-org-chain/cronos/pull/1685) Add command to fix versiondb corrupted data.

### Improvements

Expand Down
4 changes: 4 additions & 0 deletions app/versiondb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (app *App) setupVersionDB(
for _, key := range keys {
exposedKeys = append(exposedKeys, key)
}

// see: https://github.com/crypto-org-chain/cronos/issues/1683
versionDB.SetSkipVersionZero(true)

app.CommitMultiStore().AddListeners(exposedKeys)

// register in app streaming manager
Expand Down
1 change: 1 addition & 0 deletions versiondb/client/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func ChangeSetGroupCmd(opts Options) *cobra.Command {
ChangeSetToVersionDBCmd(),
RestoreAppDBCmd(opts),
RestoreVersionDBCmd(),
FixDataCmd(opts.DefaultStores),
)
return cmd
}
29 changes: 29 additions & 0 deletions versiondb/client/fixdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package client

import (
"github.com/crypto-org-chain/cronos/versiondb/tsrocksdb"
"github.com/spf13/cobra"
)

func FixDataCmd(stores []string) *cobra.Command {
cmd := &cobra.Command{
Use: "fixdata <dir>",
Args: cobra.ExactArgs(1),
Short: "Fix wrong data in versiondb, see: https://github.com/crypto-org-chain/cronos/issues/1683",
RunE: func(cmd *cobra.Command, args []string) error {
dir := args[0]
versionDB, err := tsrocksdb.NewStore(dir)
if err != nil {
return err
}

// see: https://github.com/crypto-org-chain/cronos/issues/1683
if err := versionDB.FixData(stores); err != nil {
return err
}

return nil
},
}
return cmd
}
44 changes: 36 additions & 8 deletions versiondb/tsrocksdb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsrocksdb

import (
"bytes"
"encoding/binary"

"cosmossdk.io/store/types"
"github.com/linxGnu/grocksdb"
Expand All @@ -12,11 +13,14 @@ type rocksDBIterator struct {
prefix, start, end []byte
isReverse bool
isInvalid bool

// see: https://github.com/crypto-org-chain/cronos/issues/1683
skipVersionZero bool
}

var _ types.Iterator = (*rocksDBIterator)(nil)

func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool) *rocksDBIterator {
func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool, skipVersionZero bool) *rocksDBIterator {
if isReverse {
if end == nil {
source.SeekToLast()
Expand All @@ -39,14 +43,18 @@ func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, is
source.Seek(start)
}
}
return &rocksDBIterator{
source: source,
prefix: prefix,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
it := &rocksDBIterator{
source: source,
prefix: prefix,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
skipVersionZero: skipVersionZero,
}

it.trySkipNonZeroVersion()
return it
}

// Domain implements Iterator.
Expand Down Expand Up @@ -114,6 +122,22 @@ func (itr rocksDBIterator) Next() {
} else {
itr.source.Next()
}

itr.trySkipNonZeroVersion()
}

func (itr rocksDBIterator) timestamp() uint64 {
ts := itr.source.Timestamp()
defer ts.Free()
return binary.LittleEndian.Uint64(ts.Data())
}

func (itr rocksDBIterator) trySkipNonZeroVersion() {
if itr.skipVersionZero {
for itr.Valid() && itr.timestamp() == 0 {
itr.Next()
}
}
}

// Error implements Iterator.
Expand All @@ -137,6 +161,10 @@ func (itr *rocksDBIterator) assertIsValid() {
// This function can be applied on *Slice returned from Key() and Value()
// of an Iterator, because they are marked as freed.
func moveSliceToBytes(s *grocksdb.Slice) []byte {
if s == nil {
return nil
}

defer s.Free()
if !s.Exists() {
return nil
Expand Down
97 changes: 85 additions & 12 deletions versiondb/tsrocksdb/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsrocksdb

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -38,6 +39,9 @@ func init() {
type Store struct {
db *grocksdb.DB
cfHandle *grocksdb.ColumnFamilyHandle

// see: https://github.com/crypto-org-chain/cronos/issues/1683
skipVersionZero bool
}

func NewStore(dir string) (Store, error) {
Expand All @@ -58,6 +62,10 @@ func NewStoreWithDB(db *grocksdb.DB, cfHandle *grocksdb.ColumnFamilyHandle) Stor
}
}

func (s *Store) SetSkipVersionZero(skip bool) {
s.skipVersionZero = skip
}

func (s Store) SetLatestVersion(version int64) error {
var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
Expand Down Expand Up @@ -86,11 +94,23 @@ func (s Store) PutAtVersion(version int64, changeSet []*types.StoreKVPair) error
}

func (s Store) GetAtVersionSlice(storeKey string, key []byte, version *int64) (*grocksdb.Slice, error) {
return s.db.GetCF(
value, ts, err := s.db.GetCFWithTS(
newTSReadOptions(version),
s.cfHandle,
prependStoreKey(storeKey, key),
)
if err != nil {
return nil, err
}
defer ts.Free()

if value.Exists() && s.skipVersionZero {
if binary.LittleEndian.Uint64(ts.Data()) == 0 {
return nil, nil
}
}

return value, err
}

// GetAtVersion implements VersionStore interface
Expand Down Expand Up @@ -128,28 +148,24 @@ func (s Store) GetLatestVersion() (int64, error) {

// IteratorAtVersion implements VersionStore interface
func (s Store) IteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}

prefix := storePrefix(storeKey)
start, end = iterateWithPrefix(prefix, start, end)

itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle)
return newRocksDBIterator(itr, prefix, start, end, false), nil
return s.iteratorAtVersion(storeKey, start, end, version, false)
}

// ReverseIteratorAtVersion implements VersionStore interface
func (s Store) ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) {
return s.iteratorAtVersion(storeKey, start, end, version, true)
}

func (s Store) iteratorAtVersion(storeKey string, start, end []byte, version *int64, reverse bool) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}

prefix := storePrefix(storeKey)
start, end = iterateWithPrefix(storePrefix(storeKey), start, end)
start, end = iterateWithPrefix(prefix, start, end)

itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle)
return newRocksDBIterator(itr, prefix, start, end, true), nil
return newRocksDBIterator(itr, prefix, start, end, reverse, s.skipVersionZero), nil
}

// FeedChangeSet is used to migrate legacy change sets into versiondb
Expand Down Expand Up @@ -216,6 +232,63 @@ func (s Store) Flush() error {
)
}

// FixData fixes wrong data written in versiondb due to rocksdb upgrade, the operation is idempotent.
// see: https://github.com/crypto-org-chain/cronos/issues/1683
// call this before `SetSkipVersionZero(true)`.
func (s Store) FixData(storeNames []string) error {
for _, storeName := range storeNames {
if err := s.fixDataStore(storeName); err != nil {
return err
}
}

return nil
}

// fixDataStore iterate the wrong data at version 0, parse the timestamp from the key and write it again.
func (s Store) fixDataStore(storeName string) error {
var version int64
iter, err := s.IteratorAtVersion(storeName, nil, nil, &version)
if err != nil {
return err
}
defer iter.Close()

batch := grocksdb.NewWriteBatch()
defer batch.Destroy()

prefix := storePrefix(storeName)
for ; iter.Valid(); iter.Next() {
key := iter.Key()
if len(key) < TimestampSize {
return fmt.Errorf("invalid key length: %X, store: %s", key, storeName)
}

ts := key[len(key)-TimestampSize:]
key = key[:len(key)-TimestampSize]
realKey := cloneAppend(prefix, key)

readOpts := grocksdb.NewDefaultReadOptions()
readOpts.SetTimestamp(ts)
oldValue, err := s.db.GetCF(readOpts, s.cfHandle, realKey)
if err != nil {
return err
}
readOpts.Destroy()

clean := bytes.Equal(oldValue.Data(), iter.Value())
oldValue.Free()

if clean {
continue
}

batch.PutCFWithTS(s.cfHandle, realKey, ts, iter.Value())
}

return s.db.Write(defaultSyncWriteOpts, batch)
}

func newTSReadOptions(version *int64) *grocksdb.ReadOptions {
var ver uint64
if version == nil {
Expand Down
87 changes: 87 additions & 0 deletions versiondb/tsrocksdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/binary"
"testing"

"cosmossdk.io/store/types"
dbm "github.com/cosmos/cosmos-db"
"github.com/crypto-org-chain/cronos/versiondb"
"github.com/linxGnu/grocksdb"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -153,3 +155,88 @@ func TestUserTimestampPruning(t *testing.T) {
require.Equal(t, []byte{100}, bz.Data())
bz.Free()
}

func TestSkipVersionZero(t *testing.T) {
storeKey := types.NewKVStoreKey("test")

var wrongTz [8]byte
binary.LittleEndian.PutUint64(wrongTz[:], 100)

key1 := []byte("hello1")
key2 := []byte("hello2")
key2Wrong := cloneAppend(key2, wrongTz[:])
key3 := []byte("hello3")

store, err := NewStore(t.TempDir())
require.NoError(t, err)

err = store.PutAtVersion(0, []*types.StoreKVPair{
{StoreKey: storeKey.Name(), Key: key2Wrong, Value: []byte{2}},
})
require.NoError(t, err)
err = store.PutAtVersion(100, []*types.StoreKVPair{
{StoreKey: storeKey.Name(), Key: key1, Value: []byte{1}},
})
require.NoError(t, err)
err = store.PutAtVersion(100, []*types.StoreKVPair{
{StoreKey: storeKey.Name(), Key: key3, Value: []byte{3}},
})
require.NoError(t, err)

i := int64(999)
bz, err := store.GetAtVersion(storeKey.Name(), key2Wrong, &i)
require.NoError(t, err)
require.Equal(t, []byte{2}, bz)

it, err := store.IteratorAtVersion(storeKey.Name(), nil, nil, &i)
require.NoError(t, err)
require.Equal(t,
[]kvPair{
{Key: key1, Value: []byte{1}},
{Key: key2Wrong, Value: []byte{2}},
{Key: key3, Value: []byte{3}},
},
consumeIterator(it),
)

store.SetSkipVersionZero(true)

bz, err = store.GetAtVersion(storeKey.Name(), key2Wrong, &i)
require.NoError(t, err)
require.Empty(t, bz)
bz, err = store.GetAtVersion(storeKey.Name(), key1, &i)
require.NoError(t, err)
require.Equal(t, []byte{1}, bz)

it, err = store.IteratorAtVersion(storeKey.Name(), nil, nil, &i)
require.NoError(t, err)
require.Equal(t,
[]kvPair{
{Key: key1, Value: []byte{1}},
{Key: key3, Value: []byte{3}},
},
consumeIterator(it),
)

store.SetSkipVersionZero(false)
err = store.FixData([]types.StoreKey{storeKey})
require.NoError(t, err)

bz, err = store.GetAtVersion(storeKey.Name(), key2, &i)
require.NoError(t, err)
require.Equal(t, []byte{2}, bz)
}

type kvPair struct {
Key []byte
Value []byte
}

func consumeIterator(it dbm.Iterator) []kvPair {
var result []kvPair
for ; it.Valid(); it.Next() {
result = append(result, kvPair{it.Key(), it.Value()})
}
it.Close()
return result
}
Loading