Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"reflect"
"slices"
"sort"
"sync"

gogoproto "github.com/cosmos/gogoproto/proto"
"google.golang.org/grpc"
Expand Down Expand Up @@ -204,13 +205,19 @@ func (m *MM[T]) ExportGenesisForModules(
return nil, err
}

type genesisResult struct {
moduleName string
bz json.RawMessage
err error
}

type ModuleI interface {
ExportGenesis(ctx context.Context) (json.RawMessage, error)
}

genesisData := make(map[string]json.RawMessage)
var wg sync.WaitGroup
results := make(chan genesisResult, len(modulesToExport))

// TODO: make async export genesis https://github.com/cosmos/cosmos-sdk/issues/21303
for _, moduleName := range modulesToExport {
mod := m.modules[moduleName]
var moduleI ModuleI
Expand All @@ -223,12 +230,25 @@ func (m *MM[T]) ExportGenesisForModules(
continue
}

res, err := moduleI.ExportGenesis(ctx)
if err != nil {
return nil, err
}
wg.Add(1)
go func(moduleName string, moduleI ModuleI) {
defer wg.Done()
jm, err := moduleI.ExportGenesis(ctx)
results <- genesisResult{moduleName, jm, err}
}(moduleName, moduleI)
}

go func() {
wg.Wait()
close(results)
}()

genesisData[moduleName] = res
genesisData := make(map[string]json.RawMessage)
for res := range results {
if res.err != nil {
return nil, fmt.Errorf("genesis export error in %s: %w", res.moduleName, res.err)
}
genesisData[res.moduleName] = res.bz
}

return genesisData, nil
Expand Down
37 changes: 30 additions & 7 deletions server/v2/stf/branch/writer_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package branch

import (
"fmt"
"sync"
"unsafe"

"cosmossdk.io/core/store"
Expand All @@ -25,23 +26,45 @@ type WriterMap struct {
state store.ReaderMap
branchedWriterState map[string]store.Writer
branch func(state store.Reader) store.Writer
mu sync.RWMutex // mutex to protect branchedWriterState
}

func (b WriterMap) GetReader(actor []byte) (store.Reader, error) {
return b.GetWriter(actor)
}

func (b WriterMap) GetWriter(actor []byte) (store.Writer, error) {
// Simplify and optimize state retrieval
if actorState, ok := b.branchedWriterState[unsafeString(actor)]; ok {
actorKey := unsafeString(actor)

// attempt to read the map with a read lock
b.mu.RLock()
actorState, ok := b.branchedWriterState[actorKey]
b.mu.RUnlock()

if ok {
// if the actorState is found, return it
return actorState, nil
} else if writerState, err := b.state.GetReader(actor); err != nil {
return nil, err
} else {
actorState = b.branch(writerState)
b.branchedWriterState[string(actor)] = actorState
}

// if not found, proceed with acquiring a write lock to update the map
b.mu.Lock()
defer b.mu.Unlock()

// ensure that the actorState wasn't created by another goroutine while waiting for the write lock
if actorState, ok = b.branchedWriterState[actorKey]; ok {
return actorState, nil
}

// if still not found, create the actorState and update the map
writerState, err := b.state.GetReader(actor)
if err != nil {
return nil, err
}

actorState = b.branch(writerState)
b.branchedWriterState[actorKey] = actorState

return actorState, nil
}

func (b WriterMap) ApplyStateChanges(stateChanges []store.StateChanges) error {
Expand Down
11 changes: 9 additions & 2 deletions server/v2/stf/core_store_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,24 @@ package stf

import (
"context"
"sync"

"cosmossdk.io/core/store"
)

var _ store.KVStoreService = (*storeService)(nil)

func NewKVStoreService(address []byte) store.KVStoreService {
return storeService{actor: address}
return storeService{actor: address, mu: &sync.RWMutex{}}
}

func NewMemoryStoreService(address []byte) store.MemoryStoreService {
return storeService{actor: address}
return storeService{actor: address, mu: &sync.RWMutex{}}
}

type storeService struct {
actor []byte
mu *sync.RWMutex
}

func (s storeService) OpenKVStore(ctx context.Context) store.KVStore {
Expand All @@ -26,10 +28,15 @@ func (s storeService) OpenKVStore(ctx context.Context) store.KVStore {
panic(err)
}

s.mu.Lock()

state, err := exCtx.state.GetWriter(s.actor)
if err != nil {
panic(err)
}

s.mu.Unlock()

return state
}

Expand Down