Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Bug Fixes

* [#1679](https://github.com/crypto-org-chain/cronos/pull/1679) Include no trace detail on insufficient balance fix.
* [#1685](https://github.com/crypto-org-chain/cronos/pull/1685) Fix versiondb corrupted data on startup.

### Improvements

Expand Down
16 changes: 12 additions & 4 deletions app/versiondb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,24 @@
return nil, err
}

// always listen for all keys to simplify configuration
exposedKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, key := range keys {
exposedKeys = append(exposedKeys, key)
}

versionDB, err := tsrocksdb.NewStore(dataDir)
if err != nil {
return nil, err
}

// always listen for all keys to simplify configuration
exposedKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, key := range keys {
exposedKeys = append(exposedKeys, key)
// see: https://github.com/crypto-org-chain/cronos/issues/1683
if err := versionDB.FixData(exposedKeys); err != nil {
return nil, err

Check warning on line 40 in app/versiondb.go

View check run for this annotation

Codecov / codecov/patch

app/versiondb.go#L40

Added line #L40 was not covered by tests
}

versionDB.SetSkipVersionZero(true)

app.CommitMultiStore().AddListeners(exposedKeys)

// register in app streaming manager
Expand Down
44 changes: 36 additions & 8 deletions versiondb/tsrocksdb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsrocksdb

import (
"bytes"
"encoding/binary"

"cosmossdk.io/store/types"
"github.com/linxGnu/grocksdb"
Expand All @@ -12,11 +13,14 @@ type rocksDBIterator struct {
prefix, start, end []byte
isReverse bool
isInvalid bool

// see: https://github.com/crypto-org-chain/cronos/issues/1683
skipVersionZero bool
}

var _ types.Iterator = (*rocksDBIterator)(nil)

func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool) *rocksDBIterator {
func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, isReverse bool, skipVersionZero bool) *rocksDBIterator {
if isReverse {
if end == nil {
source.SeekToLast()
Expand All @@ -39,14 +43,18 @@ func newRocksDBIterator(source *grocksdb.Iterator, prefix, start, end []byte, is
source.Seek(start)
}
}
return &rocksDBIterator{
source: source,
prefix: prefix,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
it := &rocksDBIterator{
source: source,
prefix: prefix,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
skipVersionZero: skipVersionZero,
}

it.trySkipNonZeroVersion()
return it
}

// Domain implements Iterator.
Expand Down Expand Up @@ -114,6 +122,22 @@ func (itr rocksDBIterator) Next() {
} else {
itr.source.Next()
}

itr.trySkipNonZeroVersion()
}

func (itr rocksDBIterator) timestamp() uint64 {
ts := itr.source.Timestamp()
defer ts.Free()
return binary.LittleEndian.Uint64(ts.Data())
}

func (itr rocksDBIterator) trySkipNonZeroVersion() {
if itr.skipVersionZero {
for itr.Valid() && itr.timestamp() == 0 {
itr.Next()
}
}
}

// Error implements Iterator.
Expand All @@ -137,6 +161,10 @@ func (itr *rocksDBIterator) assertIsValid() {
// This function can be applied on *Slice returned from Key() and Value()
// of an Iterator, because they are marked as freed.
func moveSliceToBytes(s *grocksdb.Slice) []byte {
if s == nil {
return nil
}

defer s.Free()
if !s.Exists() {
return nil
Expand Down
96 changes: 84 additions & 12 deletions versiondb/tsrocksdb/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tsrocksdb

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
Expand Down Expand Up @@ -38,6 +39,9 @@
type Store struct {
db *grocksdb.DB
cfHandle *grocksdb.ColumnFamilyHandle

// see: https://github.com/crypto-org-chain/cronos/issues/1683
skipVersionZero bool
}

func NewStore(dir string) (Store, error) {
Expand All @@ -58,6 +62,10 @@
}
}

func (s *Store) SetSkipVersionZero(skip bool) {
s.skipVersionZero = skip
}

func (s Store) SetLatestVersion(version int64) error {
var ts [TimestampSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
Expand Down Expand Up @@ -86,11 +94,23 @@
}

func (s Store) GetAtVersionSlice(storeKey string, key []byte, version *int64) (*grocksdb.Slice, error) {
return s.db.GetCF(
value, ts, err := s.db.GetCFWithTS(
newTSReadOptions(version),
s.cfHandle,
prependStoreKey(storeKey, key),
)
if err != nil {
return nil, err
}

Check warning on line 104 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L103-L104

Added lines #L103 - L104 were not covered by tests
defer ts.Free()

if value.Exists() && s.skipVersionZero {
if binary.LittleEndian.Uint64(ts.Data()) == 0 {
return nil, nil
}
}

return value, err
}

// GetAtVersion implements VersionStore interface
Expand Down Expand Up @@ -128,28 +148,24 @@

// IteratorAtVersion implements VersionStore interface
func (s Store) IteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}

prefix := storePrefix(storeKey)
start, end = iterateWithPrefix(prefix, start, end)

itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle)
return newRocksDBIterator(itr, prefix, start, end, false), nil
return s.iteratorAtVersion(storeKey, start, end, version, false)
}

// ReverseIteratorAtVersion implements VersionStore interface
func (s Store) ReverseIteratorAtVersion(storeKey string, start, end []byte, version *int64) (types.Iterator, error) {
return s.iteratorAtVersion(storeKey, start, end, version, true)
}

func (s Store) iteratorAtVersion(storeKey string, start, end []byte, version *int64, reverse bool) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}

prefix := storePrefix(storeKey)
start, end = iterateWithPrefix(storePrefix(storeKey), start, end)
start, end = iterateWithPrefix(prefix, start, end)

itr := s.db.NewIteratorCF(newTSReadOptions(version), s.cfHandle)
return newRocksDBIterator(itr, prefix, start, end, true), nil
return newRocksDBIterator(itr, prefix, start, end, reverse, s.skipVersionZero), nil
}

// FeedChangeSet is used to migrate legacy change sets into versiondb
Expand Down Expand Up @@ -216,6 +232,62 @@
)
}

// FixData fixes wrong data written in versiondb due to rocksdb upgrade, the operation is idempotent.
// see: https://github.com/crypto-org-chain/cronos/issues/1683
// call this before `SetSkipVersionZero(true)`.
func (s Store) FixData(storeKeys []types.StoreKey) error {
for _, storeKey := range storeKeys {
if err := s.fixDataStore(storeKey.Name()); err != nil {
return err
}

Check warning on line 242 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L241-L242

Added lines #L241 - L242 were not covered by tests
}

return nil
}

// fixDataStore iterate the wrong data at version 0, parse the timestamp from the key and write it again.
func (s Store) fixDataStore(storeName string) error {
var version int64
iter, err := s.IteratorAtVersion(storeName, nil, nil, &version)
if err != nil {
return err
}

Check warning on line 254 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L253-L254

Added lines #L253 - L254 were not covered by tests
defer iter.Close()

batch := grocksdb.NewWriteBatch()
defer batch.Destroy()

prefix := storePrefix(storeName)
for ; iter.Valid(); iter.Next() {
key := iter.Key()
if len(key) < TimestampSize {
return fmt.Errorf("invalid key length: %X", key)
}

Check warning on line 265 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L264-L265

Added lines #L264 - L265 were not covered by tests

ts := key[len(key)-TimestampSize:]
key = key[:len(key)-TimestampSize]
realKey := cloneAppend(prefix, key)

readOpts := grocksdb.NewDefaultReadOptions()
readOpts.SetTimestamp(ts)
oldValue, err := s.db.GetCF(readOpts, s.cfHandle, realKey)
if err != nil {
return err
}

Check warning on line 276 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L275-L276

Added lines #L275 - L276 were not covered by tests

clean := bytes.Equal(oldValue.Data(), iter.Value())
oldValue.Free()

if clean {
continue

Check warning on line 282 in versiondb/tsrocksdb/store.go

View check run for this annotation

Codecov / codecov/patch

versiondb/tsrocksdb/store.go#L282

Added line #L282 was not covered by tests
}

batch.PutCFWithTS(s.cfHandle, realKey, ts, iter.Value())
}

return s.db.Write(defaultSyncWriteOpts, batch)
}

func newTSReadOptions(version *int64) *grocksdb.ReadOptions {
var ver uint64
if version == nil {
Expand Down
87 changes: 87 additions & 0 deletions versiondb/tsrocksdb/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/binary"
"testing"

"cosmossdk.io/store/types"
dbm "github.com/cosmos/cosmos-db"
"github.com/crypto-org-chain/cronos/versiondb"
"github.com/linxGnu/grocksdb"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -153,3 +155,88 @@ func TestUserTimestampPruning(t *testing.T) {
require.Equal(t, []byte{100}, bz.Data())
bz.Free()
}

func TestSkipVersionZero(t *testing.T) {
storeKey := types.NewKVStoreKey("test")

var wrongTz [8]byte
binary.LittleEndian.PutUint64(wrongTz[:], 100)

key1 := []byte("hello1")
key2 := []byte("hello2")
key2Wrong := cloneAppend(key2, wrongTz[:])
key3 := []byte("hello3")

store, err := NewStore(t.TempDir())
require.NoError(t, err)

err = store.PutAtVersion(0, []*types.StoreKVPair{
{StoreKey: storeKey.Name(), Key: key2Wrong, Value: []byte{2}},
})
require.NoError(t, err)
err = store.PutAtVersion(100, []*types.StoreKVPair{
{StoreKey: storeKey.Name(), Key: key1, Value: []byte{1}},
})
require.NoError(t, err)
err = store.PutAtVersion(100, []*types.StoreKVPair{
{StoreKey: storeKey.Name(), Key: key3, Value: []byte{3}},
})
require.NoError(t, err)

i := int64(999)
bz, err := store.GetAtVersion(storeKey.Name(), key2Wrong, &i)
require.NoError(t, err)
require.Equal(t, []byte{2}, bz)

it, err := store.IteratorAtVersion(storeKey.Name(), nil, nil, &i)
require.NoError(t, err)
require.Equal(t,
[]kvPair{
{Key: key1, Value: []byte{1}},
{Key: key2Wrong, Value: []byte{2}},
{Key: key3, Value: []byte{3}},
},
consumeIterator(it),
)

store.SetSkipVersionZero(true)

bz, err = store.GetAtVersion(storeKey.Name(), key2Wrong, &i)
require.NoError(t, err)
require.Empty(t, bz)
bz, err = store.GetAtVersion(storeKey.Name(), key1, &i)
require.NoError(t, err)
require.Equal(t, []byte{1}, bz)

it, err = store.IteratorAtVersion(storeKey.Name(), nil, nil, &i)
require.NoError(t, err)
require.Equal(t,
[]kvPair{
{Key: key1, Value: []byte{1}},
{Key: key3, Value: []byte{3}},
},
consumeIterator(it),
)

store.SetSkipVersionZero(false)
err = store.FixData([]types.StoreKey{storeKey})
require.NoError(t, err)

bz, err = store.GetAtVersion(storeKey.Name(), key2, &i)
require.NoError(t, err)
require.Equal(t, []byte{2}, bz)
}

type kvPair struct {
Key []byte
Value []byte
}

func consumeIterator(it dbm.Iterator) []kvPair {
var result []kvPair
for ; it.Valid(); it.Next() {
result = append(result, kvPair{it.Key(), it.Value()})
}
it.Close()
return result
}
Loading