Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 37 additions & 5 deletions statediff/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,10 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context, params Params) (*rpc.
for {
select {
case payload := <-payloadChannel:
if notifyErr := notifier.Notify(rpcSub.ID, payload); notifyErr != nil {
log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
if unSubErr != nil {
log.Error("Failed to unsubscribe from the state diff service; error: " + unSubErr.Error())
if err := notifier.Notify(rpcSub.ID, payload); err != nil {
log.Error("Failed to send state diff packet; error: " + err.Error())
if err := api.sds.Unsubscribe(rpcSub.ID); err != nil {
log.Error("Failed to unsubscribe from the state diff service; error: " + err.Error())
}
return
}
Expand Down Expand Up @@ -99,3 +98,36 @@ func (api *PublicStateDiffAPI) StateDiffAt(ctx context.Context, blockNumber uint
func (api *PublicStateDiffAPI) StateTrieAt(ctx context.Context, blockNumber uint64, params Params) (*Payload, error) {
return api.sds.StateTrieAt(blockNumber, params)
}

// StreamCodeAndCodeHash writes all of the codehash=>code pairs out to a websocket channel
func (api *PublicStateDiffAPI) StreamCodeAndCodeHash(ctx context.Context, blockNumber uint64) (*rpc.Subscription, error) {
// ensure that the RPC connection supports subscriptions
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}

// create subscription and start waiting for events
rpcSub := notifier.CreateSubscription()
payloadChan := make(chan CodeAndCodeHash, chainEventChanSize)
quitChan := make(chan bool)
api.sds.StreamCodeAndCodeHash(blockNumber, payloadChan, quitChan)
go func() {
for {
select {
case payload := <-payloadChan:
if err := notifier.Notify(rpcSub.ID, payload); err != nil {
log.Error("Failed to send code and codehash packet", "err", err)
return
}
case err := <-rpcSub.Err():
log.Error("State diff service rpcSub error", "err", err)
return
case <-quitChan:
return
}
}
}()

return rpcSub, nil
}
10 changes: 4 additions & 6 deletions statediff/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,9 @@ func (sdb *builder) buildStateTrie(it trie.NodeIterator) ([]StateNode, []CodeAnd
node.StorageNodes = storageNodes
// emit codehash => code mappings for cod
codeHash := common.BytesToHash(account.CodeHash)
addrHash := common.BytesToHash(leafKey)
code, err := sdb.stateCache.ContractCode(addrHash, codeHash)
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
if err != nil {
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err)
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
}
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
Hash: codeHash,
Expand Down Expand Up @@ -509,10 +508,9 @@ func (sdb *builder) buildAccountCreations(accounts AccountMap, watchedStorageKey
diff.StorageNodes = storageDiffs
// emit codehash => code mappings for cod
codeHash := common.BytesToHash(val.Account.CodeHash)
addrHash := common.BytesToHash(val.LeafKey)
code, err := sdb.stateCache.ContractCode(addrHash, codeHash)
code, err := sdb.stateCache.ContractCode(common.Hash{}, codeHash)
if err != nil {
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s for account with leafkey %s\r\n error: %v", codeHash.String(), addrHash.String(), err)
return nil, nil, fmt.Errorf("failed to retrieve code for codehash %s\r\n error: %v", codeHash.String(), err)
}
codeAndCodeHashes = append(codeAndCodeHashes, CodeAndCodeHash{
Hash: codeHash,
Expand Down
44 changes: 44 additions & 0 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
)

const chainEventChanSize = 20000
Expand All @@ -44,6 +46,7 @@ type blockChain interface {
GetReceiptsByHash(hash common.Hash) types.Receipts
GetTdByHash(hash common.Hash) *big.Int
UnlockTrie(root common.Hash)
StateCache() state.Database
}

// IService is the state-diffing service interface
Expand All @@ -60,6 +63,8 @@ type IService interface {
StateDiffAt(blockNumber uint64, params Params) (*Payload, error)
// Method to get state trie object at specific block
StateTrieAt(blockNumber uint64, params Params) (*Payload, error)
// Method to stream out all code and codehash pairs
StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool)
}

// Service is the underlying struct for the state diffing service
Expand Down Expand Up @@ -361,3 +366,42 @@ func sendNonBlockingQuit(id rpc.ID, sub Subscription) {
log.Info("unable to close subscription %s; channel has no receiver", id)
}
}

// StreamCodeAndCodeHash subscription method for extracting all the codehash=>code mappings that exist in the trie at the provided height
func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- CodeAndCodeHash, quitChan chan<- bool) {
current := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info(fmt.Sprintf("sending code and codehash at block %d", blockNumber))
currentTrie, err := sds.BlockChain.StateCache().OpenTrie(current.Root())
if err != nil {
log.Error("error creating trie for block", "number", current.Number(), "err", err)
close(quitChan)
return
}
it := currentTrie.NodeIterator([]byte{})
leafIt := trie.NewIterator(it)
go func() {
defer close(quitChan)
for leafIt.Next() {
select {
case <-sds.QuitChan:
return
default:
}
account := new(state.Account)
if err := rlp.DecodeBytes(leafIt.Value, account); err != nil {
log.Error("error decoding state account", "err", err)
return
}
codeHash := common.BytesToHash(account.CodeHash)
code, err := sds.BlockChain.StateCache().ContractCode(common.Hash{}, codeHash)
if err != nil {
log.Error("error collecting contract code", "err", err)
return
}
outChan <- CodeAndCodeHash{
Hash: codeHash,
Code: code,
}
}
}()
}
6 changes: 6 additions & 0 deletions statediff/testhelpers/mocks/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/core/state"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -126,3 +128,7 @@ func (blockChain *BlockChain) SetTdByHash(hash common.Hash, td *big.Int) {
}

func (blockChain *BlockChain) UnlockTrie(root common.Hash) {}

func (BlockChain *BlockChain) StateCache() state.Database {
return nil
}
4 changes: 4 additions & 0 deletions statediff/testhelpers/mocks/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,10 @@ func (sds *MockStateDiffService) closeType(subType common.Hash) {
delete(sds.SubscriptionTypes, subType)
}

func (sds *MockStateDiffService) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- statediff.CodeAndCodeHash, quitChan chan<- bool) {
panic("implement me")
}

func sendNonBlockingQuit(id rpc.ID, sub statediff.Subscription) {
select {
case sub.QuitChan <- true:
Expand Down
4 changes: 2 additions & 2 deletions statediff/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ type StateObject struct {
// CodeAndCodeHash struct for holding codehash => code mappings
// we can't use an actual map because they are not rlp serializable
type CodeAndCodeHash struct {
Hash common.Hash
Code []byte
Hash common.Hash `json:"codeHash"`
Code []byte `json:"code"`
}

// StateNode holds the data for a single state diff node
Expand Down