Skip to content

Commit cb6f840

Browse files
elizabethengelmani-norden
authored andcommitted
Statediffing geth
* Write state diff to CSV (#2) * port statediff from https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go; minor fixes * integrating state diff extracting, building, and persisting into geth processes * work towards persisting created statediffs in ipfs; based off github.com/vulcanize/eth-block-extractor * Add a state diff service * Remove diff extractor from blockchain * Update imports * Move statediff on/off check to geth cmd config * Update starting state diff service * Add debugging logs for creating diff * Add statediff extractor and builder tests and small refactoring * Start to write statediff to a CSV * Restructure statediff directory * Pull CSV publishing methods into their own file * Reformatting due to go fmt * Add gomega to vendor dir * Remove testing focuses * Update statediff tests to use golang test pkg instead of ginkgo - builder_test - extractor_test - publisher_test * Use hexutil.Encode instead of deprecated common.ToHex * Remove OldValue from DiffBigInt and DiffUint64 fields * Update builder test * Remove old storage value from updated accounts * Remove old values from created/deleted accounts * Update publisher to account for only storing current account values * Update service loop and fetching previous block * Update testing - remove statediff ginkgo test suite file - move mocks to their own dir * Updates per go fmt * Updates to tests * Pass statediff mode and path in through cli * Return filename from publisher * Remove some duplication in builder * Remove code field from state diff output this is the contract byte code, and it can still be obtained by querying the db by the codeHash * Consolidate acct diff structs for updated & updated/deleted accts * Include block number in csv filename * Clean up error logging * Cleanup formatting, spelling, etc * Address PR comments * Add contract address and storage value to csv * Refactor accumulating account row in csv publisher * Add DiffStorage struct * Add storage key to csv * Address PR comments * Fix publisher to include rows for accounts that don't have store updates * Update builder test after merging in release/1.8 * Update test contract to include storage on contract intialization - so that we're able to test that storage diffing works for created and deleted accounts (not just updated accounts). * Factor out a common trie iterator method in builder * Apply goimports to statediff * Apply gosimple changes to statediff * Gracefully exit geth command(#4) * Statediff for full node (#6) * Open a trie from the in-memory database * Use a node's LeafKey as an identifier instead of the address It was proving difficult to find look the address up from a given path with a full node (sometimes the value wouldn't exist in the disk db). So, instead, for now we are using the node's LeafKey with is a Keccak256 hash of the address, so if we know the address we can figure out which LeafKey it matches up to. * Make sure that statediff has been processed before pruning * Use blockchain stateCache.OpenTrie for storage diffs * Clean up log lines and remove unnecessary fields from builder * Apply go fmt changes * Add a sleep to the blockchain test * Address PR comments * Address PR comments * refactoring/reorganizing packages * refactoring statediff builder and types and adjusted to relay proofs and paths (still need to make this optional) * refactoring state diff service and adding api which allows for streaming state diff payloads over an rpc websocket subscription * make proofs and paths optional + compress service loop into single for loop (may be missing something here) * option to process intermediate nodes * make state diff rlp serializable * cli parameter to limit statediffing to select account addresses + test * review fixes and fixes for issues ran into in integration * review fixes; proper method signature for api; adjust service so that statediff processing is halted/paused until there is at least one subscriber listening for the results * adjust buffering to improve stability; doc.go; fix notifier err handling * relay receipts with the rest of the data + review fixes/changes * rpc method to get statediff at specific block; requires archival node or the block be within the pruning range * review fixes * fixes after rebase * statediff verison meta * fix linter issues * include total difficulty to the payload * fix state diff builder: emit actual leaf nodes instead of value nodes; diff on the leaf not on the value; emit correct path for intermediate nodes * adjust statediff builder tests to changes and extend to test intermediate nodes; golint * add genesis block to test; handle block 0 in StateDiffAt * rlp files for mainnet blocks 0-3, for tests * builder test on mainnet blocks * common.BytesToHash(path) => crypto.Keaccak256(hash) in builder; BytesToHash produces same hash for e.g. []byte{} and []byte{\x00} - prefix \x00 steps are inconsequential to the hash result * complete tests for early mainnet blocks * diff type for representing deleted accounts * fix builder so that we handle account deletions properly and properly diff storage when an account is moved to a new path; update params * remove cli params; moving them to subscriber defined * remove unneeded bc methods * update service and api; statediffing params are now defined by user through api rather than by service provider by cli * update top level tests * add ability to watch specific storage slots (leaf keys) only * comments; explain logic * update mainnet blocks test * update api_test.go * storage leafkey filter test * cleanup chain maker * adjust chain maker for tests to add an empty account in block1 and switch to EIP-158 afterwards (now we just need to generate enough accounts until one causes the empty account to be touched and removed post-EIP-158 so we can simulate and test that process...); also added 2 new blocks where more contract storage is set and old slots are set to zero so they are removed so we can test that * found an account whose creation causes the empty account to be moved to a new path; this should count as 'touching; the empty account and cause it to be removed according to eip-158... but it doesn't * use new contract in unit tests that has self-destruct ability, so we can test eip-158 since simply moving an account to new path doesn't count as 'touchin' it * handle storage deletions * tests for eip-158 account removal and storage value deletions; there is one edge case left to test where we remove 1 account when only two exist such that the remaining account is moved up and replaces the root branch node * finish testing known edge cases * add endpoint to fetch all state and storage nodes at a given blockheight; useful for generating a recent atate cache/snapshot that we can diff forward from rather than needing to collect all diffs from genesis * test for state trie builder * minor changes/fixes * update version meta * if statediffing is on, lock tries in triedb until the statediffing service signals they are done using them * update version meta * fix mock blockchain; golint; bump patch * increase maxRequestContentLength; bump patch * log the sizes of the state objects we are sending * CI build (#20) * CI: run build on PR and on push to master * CI: debug building geth * CI: fix coping file * CI: fix coping file v2 * CI: temporary upload file to release asset * CI: get release upload_url by tag, upload asset to current relase * CI: fix tag name * fix ci build on statediff_at_anyblock-1.9.11 branch * fix publishing assets in release * bump version meta * use context deadline for timeout in eth_call * collect and emit codehash=>code mappings for state objects * subscription endpoint for retrieving all the codehash=>code mappings that exist at provided height * bump version meta * Implement WriteStateDiffAt * Writes state diffs directly to postgres * Adds CLI flags to configure PG * Refactors builder output with callbacks * Copies refactored postgres handling code from ipld-eth-indexer * rename PostgresCIDWriter.{index->upsert}* * less ambiguous * go.mod update * rm unused * cleanup * output code & codehash iteratively * had to rf some types for this * prometheus metrics output * duplicate recent eth-indexer changes * migrations and metrics... * [wip] prom.Init() here? another CLI flag? * cleanup * tidy & DRY * statediff WriteLoop service + CLI flag * [wip] update test mocks * todo - do something meaningful to test write loop * logging * use geth log * port tests to go testing * drop ginkgo/gomega * fix and cleanup tests * fail before defer statement * delete vendor/ dir * unused * bump version meta
1 parent 8c2f271 commit cb6f840

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

73 files changed

+11191
-23
lines changed

.github/workflows/build.yml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
name: Docker Build
2+
3+
on: [pull_request]
4+
5+
jobs:
6+
build:
7+
name: Run docker build
8+
runs-on: ubuntu-latest
9+
steps:
10+
- uses: actions/checkout@v2
11+
- name: Run docker build
12+
run: docker build -t vulcanize/go-ethereum -f Dockerfile.amd64 .

.github/workflows/on-master.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
name: Docker Build and publish to Github
2+
3+
on:
4+
push:
5+
branches:
6+
- statediff_at_anyblock-1.9.11
7+
8+
jobs:
9+
build:
10+
name: Run docker build and publish
11+
runs-on: ubuntu-latest
12+
steps:
13+
- uses: actions/checkout@v2
14+
- name: Run docker build
15+
run: docker build -t vulcanize/go-ethereum -f Dockerfile.amd64 .
16+
- name: Get the version
17+
id: vars
18+
run: echo ::set-output name=sha::$(echo ${GITHUB_SHA:0:7})
19+
- name: Tag docker image
20+
run: docker tag vulcanize/go-ethereum docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}}
21+
- name: Docker Login
22+
run: echo ${{ secrets.GITHUB_TOKEN }} | docker login https://docker.pkg.github.com -u vulcanize --password-stdin
23+
- name: Docker Push
24+
run: docker push docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}}
25+

.github/workflows/publish.yaml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
name: Publish geth to release
2+
on:
3+
release:
4+
types: [published]
5+
jobs:
6+
push_to_registries:
7+
name: Publish assets to Release
8+
runs-on: ubuntu-latest
9+
steps:
10+
- name: Get the version
11+
id: vars
12+
run: |
13+
echo ::set-output name=sha::$(echo ${GITHUB_SHA:0:7})
14+
- name: Docker Login to Github Registry
15+
run: echo ${{ secrets.GITHUB_TOKEN }} | docker login https://docker.pkg.github.com -u vulcanize --password-stdin
16+
- name: Docker Pull
17+
run: docker pull docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}}
18+
- name: Copy ethereum binary file
19+
run: docker run --rm --entrypoint cat docker.pkg.github.com/vulcanize/go-ethereum/go-ethereum:${{steps.vars.outputs.sha}} /go-ethereum/build/bin/geth > geth-linux-amd64
20+
- name: Get release
21+
id: get_release
22+
uses: bruceadams/[email protected]
23+
env:
24+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
25+
- name: Upload Release Asset
26+
id: upload-release-asset
27+
uses: actions/upload-release-asset@v1
28+
env:
29+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
30+
with:
31+
upload_url: ${{ steps.get_release.outputs.upload_url }}
32+
asset_path: geth-linux-amd64
33+
asset_name: geth-linux-amd64
34+
asset_content_type: application/octet-stream

Dockerfile.amd64

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Build Geth in a stock Go builder container
2+
FROM golang:1.13 as builder
3+
4+
#RUN apk add --no-cache make gcc musl-dev linux-headers git
5+
6+
ADD . /go-ethereum
7+
RUN cd /go-ethereum && make geth

cmd/geth/config.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ func makeConfigNode(ctx *cli.Context) (*node.Node, gethConfig) {
145145
cfg.Ethstats.URL = ctx.GlobalString(utils.EthStatsURLFlag.Name)
146146
}
147147
utils.SetShhConfig(ctx, stack)
148+
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
149+
cfg.Eth.Diffing = true
150+
}
148151

149152
return stack, cfg
150153
}
@@ -165,6 +168,26 @@ func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend) {
165168
backend := utils.RegisterEthService(stack, &cfg.Eth)
166169

167170
checkWhisper(ctx)
171+
172+
if ctx.GlobalBool(utils.StateDiffFlag.Name) {
173+
var dbParams *[3]string
174+
if ctx.GlobalIsSet(utils.StateDiffDBFlag.Name) {
175+
dbParams = new([3]string)
176+
dbParams[0] = ctx.GlobalString(utils.StateDiffDBFlag.Name)
177+
if ctx.GlobalIsSet(utils.StateDiffDBNodeIDFlag.Name) {
178+
dbParams[1] = ctx.GlobalString(utils.StateDiffDBNodeIDFlag.Name)
179+
} else {
180+
utils.Fatalf("Must specify node ID for statediff DB output")
181+
}
182+
if ctx.GlobalIsSet(utils.StateDiffDBClientNameFlag.Name) {
183+
dbParams[2] = ctx.GlobalString(utils.StateDiffDBClientNameFlag.Name)
184+
} else {
185+
utils.Fatalf("Must specify client name for statediff DB output")
186+
}
187+
}
188+
utils.RegisterStateDiffService(stack, dbParams, ctx.GlobalBool(utils.StateDiffWritingFlag.Name))
189+
}
190+
168191
// Configure GraphQL if requested
169192
if ctx.GlobalIsSet(utils.GraphQLEnabledFlag.Name) {
170193
utils.RegisterGraphQLService(stack, backend, cfg.Node)

cmd/geth/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ var (
157157
utils.GpoMaxGasPriceFlag,
158158
utils.EWASMInterpreterFlag,
159159
utils.EVMInterpreterFlag,
160+
utils.StateDiffFlag,
161+
utils.StateDiffDBFlag,
162+
utils.StateDiffDBNodeIDFlag,
163+
utils.StateDiffDBClientNameFlag,
164+
utils.StateDiffWritingFlag,
160165
configFileFlag,
161166
}
162167

cmd/geth/usage.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,16 @@ var AppHelpFlagGroups = []flags.FlagGroup{
234234
utils.LegacyGraphQLPortFlag,
235235
}, debug.DeprecatedFlags...),
236236
},
237+
{
238+
Name: "STATE DIFF",
239+
Flags: []cli.Flag{
240+
utils.StateDiffFlag,
241+
utils.StateDiffDBFlag,
242+
utils.StateDiffDBNodeIDFlag,
243+
utils.StateDiffDBClientNameFlag,
244+
utils.StateDiffWritingFlag,
245+
},
246+
},
237247
{
238248
Name: "MISC",
239249
Flags: []cli.Flag{

cmd/utils/flags.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"text/template"
3232
"time"
3333

34+
cli "gopkg.in/urfave/cli.v1"
35+
3436
"github.com/ethereum/go-ethereum/accounts"
3537
"github.com/ethereum/go-ethereum/accounts/keystore"
3638
"github.com/ethereum/go-ethereum/common"
@@ -63,8 +65,9 @@ import (
6365
"github.com/ethereum/go-ethereum/p2p/nat"
6466
"github.com/ethereum/go-ethereum/p2p/netutil"
6567
"github.com/ethereum/go-ethereum/params"
68+
"github.com/ethereum/go-ethereum/statediff"
69+
6670
pcsclite "github.com/gballet/go-libpcsclite"
67-
cli "gopkg.in/urfave/cli.v1"
6871
)
6972

7073
func init() {
@@ -722,6 +725,27 @@ var (
722725
Usage: "External EVM configuration (default = built-in interpreter)",
723726
Value: "",
724727
}
728+
729+
StateDiffFlag = cli.BoolFlag{
730+
Name: "statediff",
731+
Usage: "Enables the processing of state diffs between each block",
732+
}
733+
StateDiffDBFlag = cli.StringFlag{
734+
Name: "statediff.db",
735+
Usage: "PostgreSQL database connection string for writing state diffs",
736+
}
737+
StateDiffDBNodeIDFlag = cli.StringFlag{
738+
Name: "statediff.dbnodeid",
739+
Usage: "Node ID to use when writing state diffs to database",
740+
}
741+
StateDiffDBClientNameFlag = cli.StringFlag{
742+
Name: "statediff.dbclientname",
743+
Usage: "Client name to use when writing state diffs to database",
744+
}
745+
StateDiffWritingFlag = cli.BoolFlag{
746+
Name: "statediff.writing",
747+
Usage: "Activates progressive writing of state diffs to database as new block are synced",
748+
}
725749
)
726750

727751
// MakeDataDir retrieves the currently requested data directory, terminating
@@ -987,6 +1011,9 @@ func setWS(ctx *cli.Context, cfg *node.Config) {
9871011
if ctx.GlobalIsSet(WSApiFlag.Name) {
9881012
cfg.WSModules = SplitAndTrim(ctx.GlobalString(WSApiFlag.Name))
9891013
}
1014+
if ctx.GlobalBool(StateDiffFlag.Name) {
1015+
cfg.WSModules = append(cfg.WSModules, "statediff")
1016+
}
9901017
}
9911018

9921019
// setIPC creates an IPC path configuration from the set command line flags,
@@ -1715,6 +1742,21 @@ func RegisterGraphQLService(stack *node.Node, backend ethapi.Backend, cfg node.C
17151742
}
17161743
}
17171744

1745+
// RegisterStateDiffService configures and registers a service to stream state diff data over RPC
1746+
// dbParams are: Postgres connection URI, Node ID, client name
1747+
func RegisterStateDiffService(stack *node.Node, dbParams *[3]string, startWriteLoop bool) {
1748+
if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
1749+
var ethServ *eth.Ethereum
1750+
err := ctx.Service(&ethServ)
1751+
if err != nil {
1752+
return nil, err
1753+
}
1754+
return statediff.NewStateDiffService(ethServ, dbParams, startWriteLoop)
1755+
}); err != nil {
1756+
Fatalf("Failed to register State Diff Service", err)
1757+
}
1758+
}
1759+
17181760
func SetupMetrics(ctx *cli.Context) {
17191761
if metrics.Enabled {
17201762
log.Info("Enabling metrics collection")

core/blockchain.go

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ type CacheConfig struct {
131131
SnapshotLimit int // Memory allowance (MB) to use for caching snapshot entries in memory
132132

133133
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
134+
StateDiffing bool // Whether or not the statediffing service is running
134135
}
135136

136137
// defaultCacheConfig are the default caching values if none are specified by the
@@ -210,6 +211,10 @@ type BlockChain struct {
210211
badBlocks *lru.Cache // Bad block cache
211212
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
212213
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
214+
215+
// Locked roots and their mutex
216+
trieLock sync.Mutex
217+
lockedRoots map[common.Hash]bool
213218
}
214219

215220
// NewBlockChain returns a fully initialised block chain using information
@@ -226,7 +231,6 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
226231
txLookupCache, _ := lru.New(txLookupCacheLimit)
227232
futureBlocks, _ := lru.New(maxFutureBlocks)
228233
badBlocks, _ := lru.New(badBlockLimit)
229-
230234
bc := &BlockChain{
231235
chainConfig: chainConfig,
232236
cacheConfig: cacheConfig,
@@ -244,6 +248,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par
244248
engine: engine,
245249
vmConfig: vmConfig,
246250
badBlocks: badBlocks,
251+
lockedRoots: make(map[common.Hash]bool),
247252
}
248253
bc.validator = NewBlockValidator(chainConfig, bc, engine)
249254
bc.prefetcher = newStatePrefetcher(chainConfig, bc, engine)
@@ -967,7 +972,10 @@ func (bc *BlockChain) Stop() {
967972
}
968973
}
969974
for !bc.triegc.Empty() {
970-
triedb.Dereference(bc.triegc.PopItem().(common.Hash))
975+
pruneRoot := bc.triegc.PopItem().(common.Hash)
976+
if !bc.TrieLocked(pruneRoot) {
977+
triedb.Dereference(pruneRoot)
978+
}
971979
}
972980
if size, _ := triedb.Size(); size != 0 {
973981
log.Error("Dangling trie nodes after full cleanup")
@@ -1473,6 +1481,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
14731481
triedb.Reference(root, common.Hash{}) // metadata reference to keep trie alive
14741482
bc.triegc.Push(root, -int64(block.NumberU64()))
14751483

1484+
// If we are statediffing, lock the trie until the statediffing service is done using it
1485+
if bc.cacheConfig.StateDiffing {
1486+
bc.LockTrie(root)
1487+
}
1488+
14761489
if current := block.NumberU64(); current > TriesInMemory {
14771490
// If we exceeded our memory allowance, flush matured singleton nodes to disk
14781491
var (
@@ -1511,7 +1524,11 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
15111524
bc.triegc.Push(root, number)
15121525
break
15131526
}
1514-
triedb.Dereference(root.(common.Hash))
1527+
pruneRoot := root.(common.Hash)
1528+
if !bc.TrieLocked(pruneRoot) {
1529+
log.Debug("Dereferencing", "root", root.(common.Hash).Hex())
1530+
triedb.Dereference(pruneRoot)
1531+
}
15151532
}
15161533
}
15171534
}
@@ -2481,3 +2498,28 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript
24812498
func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription {
24822499
return bc.scope.Track(bc.blockProcFeed.Subscribe(ch))
24832500
}
2501+
2502+
// TrieLocked returns whether the trie associated with the provided root is locked for use
2503+
func (bc *BlockChain) TrieLocked(root common.Hash) bool {
2504+
bc.trieLock.Lock()
2505+
locked, ok := bc.lockedRoots[root]
2506+
bc.trieLock.Unlock()
2507+
if !ok {
2508+
return false
2509+
}
2510+
return locked
2511+
}
2512+
2513+
// LockTrie prevents dereferencing of the provided root
2514+
func (bc *BlockChain) LockTrie(root common.Hash) {
2515+
bc.trieLock.Lock()
2516+
bc.lockedRoots[root] = true
2517+
bc.trieLock.Unlock()
2518+
}
2519+
2520+
// UnlockTrie allows dereferencing of the provided root- provided it was previously locked
2521+
func (bc *BlockChain) UnlockTrie(root common.Hash) {
2522+
bc.trieLock.Lock()
2523+
bc.lockedRoots[root] = false
2524+
bc.trieLock.Unlock()
2525+
}

eth/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ func New(stack *node.Node, config *Config) (*Ethereum, error) {
169169
TrieDirtyDisabled: config.NoPruning,
170170
TrieTimeLimit: config.TrieTimeout,
171171
SnapshotLimit: config.SnapshotCache,
172+
StateDiffing: config.Diffing,
172173
}
173174
)
174175
eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, chainConfig, eth.engine, vmConfig, eth.shouldPreserve, &config.TxLookupLimit)

0 commit comments

Comments
 (0)