Skip to content

Commit f6ff20e

Browse files
author
Abdul Rabbani
committed
Handle All Gaps within Geth
Including an updated doc which keeps track of events in this PR.
1 parent 1a3a63d commit f6ff20e

File tree

10 files changed

+313
-33
lines changed

10 files changed

+313
-33
lines changed

statediff/docs/KnownGaps.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Overview
2+
3+
This document will provide some insight into the `known_gaps` table, their use cases, and implementation. Please refer to the [following PR](https://github.com/vulcanize/go-ethereum/pull/217) and the [following epic](https://github.com/vulcanize/ops/issues/143) to grasp their inception.
4+
5+
![known gaps](diagrams/KnownGapsProcess.png)
6+
7+
# Use Cases
8+
9+
The known gaps table is updated when the following events occur:
10+
11+
1. At start up we check the latest block from the `eth.headers_cid` table. We compare the first block that we are processing with the latest block from the DB. If they are not one unit of expectedDifference away from each other, add the gap between the two blocks.
12+
2. If there is any error in processing a block (db connection, deadlock, etc), add that block to the knownErrorBlocks slice, when the next block is successfully written, write this slice into the DB.
13+
3. If the last processed block is not one unit of expectedDifference away from the current block being processed. This can be due to any unknown or unhandled errors in geth.
14+
15+
# Glossary
16+
17+
1. `expectedDifference (number)` - This number indicates what the difference between two blocks should be. If we are capturing all events on a geth node then this number would be `1`. But once we scale nodes, the `expectedDifference` might be `2` or greater.
18+
2. `processingKey (number)` - This number can be used to keep track of different geth nodes and their specific `expectedDifference`.

statediff/docs/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Overview
2+
3+
This folder keeps tracks of random documents as they relate to the `statediff` service.
52.4 KB
Loading

statediff/indexer/database/dump/indexer.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,3 +500,6 @@ func (sdi *StateDiffIndexer) Close() error {
500500
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
501501
return nil
502502
}
503+
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
504+
return nil
505+
}

statediff/indexer/database/file/indexer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,3 +482,7 @@ func (sdi *StateDiffIndexer) Close() error {
482482
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
483483
return nil
484484
}
485+
486+
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
487+
return nil
488+
}

statediff/indexer/database/sql/indexer.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,10 @@ func (sdi *StateDiffIndexer) Close() error {
555555
}
556556

557557
// Update the known gaps table with the gap information.
558-
func (sdi *StateDiffIndexer) pushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
558+
func (sdi *StateDiffIndexer) PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error {
559+
if startingBlockNumber.Cmp(endingBlockNumber) != -1 {
560+
return fmt.Errorf("Starting Block %d, is greater than ending block %d", startingBlockNumber, endingBlockNumber)
561+
}
559562
knownGap := models.KnownGapsModel{
560563
StartingBlockNumber: startingBlockNumber.String(),
561564
EndingBlockNumber: endingBlockNumber.String(),
@@ -573,7 +576,7 @@ func (sdi *StateDiffIndexer) QueryDb(queryString string) (string, error) {
573576
var ret string
574577
err := sdi.dbWriter.db.QueryRow(context.Background(), queryString).Scan(&ret)
575578
if err != nil {
576-
log.Error("Can't properly query the DB for query: ", queryString)
579+
log.Error(fmt.Sprint("Can't properly query the DB for query: ", queryString))
577580
return "", err
578581
}
579582
return ret, nil
@@ -589,7 +592,7 @@ func (sdi *StateDiffIndexer) QueryDbToBigInt(queryString string) (*big.Int, erro
589592
}
590593
ret, ok := ret.SetString(res, 10)
591594
if !ok {
592-
log.Error("Can't turn the res ", res, "into a bigInt")
595+
log.Error(fmt.Sprint("Can't turn the res ", res, "into a bigInt"))
593596
return ret, fmt.Errorf("Can't turn %s into a bigInt", res)
594597
}
595598
return ret, nil
@@ -611,6 +614,9 @@ func isGap(latestBlockInDb *big.Int, latestBlockOnChain *big.Int, expectedDiffer
611614
// This function will check for Gaps and update the DB if gaps are found.
612615
// The processingKey will currently be set to 0, but as we start to leverage horizontal scaling
613616
// It might be a useful parameter to update depending on the geth node.
617+
// TODO:
618+
// REmove the return value
619+
// Write to file if err in writing to DB
614620
func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error {
615621
dbQueryString := "SELECT MAX(block_number) FROM eth.header_cids"
616622
latestBlockInDb, err := sdi.QueryDbToBigInt(dbQueryString)
@@ -625,8 +631,13 @@ func (sdi *StateDiffIndexer) FindAndUpdateGaps(latestBlockOnChain *big.Int, expe
625631
startBlock.Add(latestBlockInDb, expectedDifference)
626632
endBlock.Sub(latestBlockOnChain, expectedDifference)
627633

628-
log.Warn("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock)
629-
sdi.pushKnownGaps(startBlock, endBlock, false, processingKey)
634+
log.Warn(fmt.Sprint("Found Gaps starting at, ", startBlock, " and ending at, ", endBlock))
635+
err := sdi.PushKnownGaps(startBlock, endBlock, false, processingKey)
636+
if err != nil {
637+
// Write to file SQL file instead!!!
638+
// If write to SQL file fails, write to disk. Handle this within the write to SQL file function!
639+
return err
640+
}
630641
}
631642

632643
return nil

statediff/indexer/database/sql/mainnet_tests/indexer_test.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -116,31 +116,3 @@ func tearDown(t *testing.T) {
116116
err = ind.Close()
117117
require.NoError(t, err)
118118
}
119-
120-
//func TestKnownGapsUpsert(t *testing.T) {
121-
// var startBlockNumber int64 = 111
122-
// var endBlockNumber int64 = 121
123-
// ind, err := setupDb(t)
124-
// if err != nil {
125-
// t.Fatal(err)
126-
// }
127-
// require.NoError(t, err)
128-
//
129-
// testKnownGapsUpsert(t, startBlockNumber, endBlockNumber, ind)
130-
// //str, err := ind.QueryDb("SELECT MAX(block_number) FROM eth.header_cids") // Figure out the string.
131-
// queryString := fmt.Sprintf("SELECT starting_block_number from eth.known_gaps WHERE starting_block_number = %d AND ending_block_number = %d", startBlockNumber, endBlockNumber)
132-
// _, queryErr := ind.QueryDb(queryString) // Figure out the string.
133-
// require.NoError(t, queryErr)
134-
//
135-
//}
136-
//func testKnownGapsUpsert(t *testing.T, startBlockNumber int64, endBlockNumber int64, ind interfaces.StateDiffIndexer) {
137-
// startBlock := big.NewInt(startBlockNumber)
138-
// endBlock := big.NewInt(endBlockNumber)
139-
//
140-
// processGapError := ind.PushKnownGaps(startBlock, endBlock, false, 1)
141-
// if processGapError != nil {
142-
// t.Fatal(processGapError)
143-
// }
144-
// require.NoError(t, processGapError)
145-
//}
146-
//

statediff/indexer/interfaces/interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ type StateDiffIndexer interface {
3333
PushCodeAndCodeHash(tx Batch, codeAndCodeHash sdtypes.CodeAndCodeHash) error
3434
ReportDBMetrics(delay time.Duration, quit <-chan bool)
3535
FindAndUpdateGaps(latestBlockOnChain *big.Int, expectedDifference *big.Int, processingKey int64) error
36+
PushKnownGaps(startingBlockNumber *big.Int, endingBlockNumber *big.Int, checkedOut bool, processingKey int64) error
3637
io.Closer
3738
}
3839

statediff/service.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package statediff
1818

1919
import (
2020
"bytes"
21+
"fmt"
2122
"math/big"
2223
"strconv"
2324
"strings"
@@ -123,6 +124,8 @@ type Service struct {
123124
BackendAPI ethapi.Backend
124125
// Should the statediff service wait for geth to sync to head?
125126
WaitForSync bool
127+
// Used to signal if we should check for KnownGaps
128+
KnownGaps KnownGaps
126129
// Whether or not we have any subscribers; only if we do, do we processes state diffs
127130
subscribers int32
128131
// Interface for publishing statediffs as PG-IPLD objects
@@ -135,6 +138,91 @@ type Service struct {
135138
maxRetry uint
136139
}
137140

141+
type KnownGaps struct {
142+
// Should we check for gaps by looking at the DB and comparing the latest block with head
143+
checkForGaps bool
144+
// Arbitrary processingKey that can be used down the line to differentiate different geth nodes.
145+
processingKey int64
146+
// This number indicates the expected difference between blocks.
147+
// Currently, this is 1 since the geth node processes each block. But down the road this can be used in
148+
// Tandom with the processingKey to differentiate block processing logic.
149+
expectedDifference *big.Int
150+
// Indicates if Geth is in an error state
151+
// This is used to indicate the right time to upserts
152+
errorState bool
153+
// This array keeps track of errorBlocks as they occur.
154+
// When the errorState is false again, we can process these blocks.
155+
// Do we need a list, can we have /KnownStartErrorBlock and knownEndErrorBlock ints instead?
156+
knownErrorBlocks []*big.Int
157+
// The last processed block keeps track of the last processed block.
158+
// Its used to make sure we didn't skip over any block!
159+
lastProcessedBlock *big.Int
160+
}
161+
162+
// This function will capture any missed blocks that were not captured in sds.KnownGaps.knownErrorBlocks.
163+
// It is invoked when the sds.KnownGaps.lastProcessed block is not one unit
164+
// away from sds.KnownGaps.expectedDifference
165+
// Essentially, if geth ever misses blocks but doesn't output an error, we are covered.
166+
func (sds *Service) capturedMissedBlocks(currentBlock *big.Int, knownErrorBlocks []*big.Int, lastProcessedBlock *big.Int) {
167+
// last processed: 110
168+
// current block: 125
169+
if len(knownErrorBlocks) > 0 {
170+
// 115
171+
startErrorBlock := new(big.Int).Set(knownErrorBlocks[0])
172+
// 120
173+
endErrorBlock := new(big.Int).Set(knownErrorBlocks[len(knownErrorBlocks)-1])
174+
175+
// 111
176+
expectedStartErrorBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
177+
// 124
178+
expectedEndErrorBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
179+
180+
if (expectedStartErrorBlock == startErrorBlock) &&
181+
(expectedEndErrorBlock == endErrorBlock) {
182+
log.Info("All Gaps already captured in knownErrorBlocks")
183+
}
184+
185+
if expectedEndErrorBlock.Cmp(endErrorBlock) == 1 {
186+
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
187+
log.Warn("But there are gaps that were also not added there.")
188+
log.Warn(fmt.Sprint("Last Block in knownErrorBlocks: ", endErrorBlock))
189+
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
190+
log.Warn(fmt.Sprint("Current Block: ", currentBlock))
191+
//120 + 1 == 121
192+
startBlock := big.NewInt(0).Add(endErrorBlock, sds.KnownGaps.expectedDifference)
193+
// 121 to 124
194+
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", startBlock, expectedEndErrorBlock))
195+
sds.indexer.PushKnownGaps(startBlock, expectedEndErrorBlock, false, sds.KnownGaps.processingKey)
196+
}
197+
198+
if expectedStartErrorBlock.Cmp(startErrorBlock) == -1 {
199+
log.Warn(fmt.Sprint("There are gaps in the knownErrorBlocks list: ", knownErrorBlocks))
200+
log.Warn("But there are gaps that were also not added there.")
201+
log.Warn(fmt.Sprint("First Block in knownErrorBlocks: ", startErrorBlock))
202+
log.Warn(fmt.Sprint("Last processed Block: ", lastProcessedBlock))
203+
// 115 - 1 == 114
204+
endBlock := big.NewInt(0).Sub(startErrorBlock, sds.KnownGaps.expectedDifference)
205+
// 111 to 114
206+
log.Warn(fmt.Sprintf("Adding the following block range to known_gaps table: %d - %d", expectedStartErrorBlock, endBlock))
207+
sds.indexer.PushKnownGaps(expectedStartErrorBlock, endBlock, false, sds.KnownGaps.processingKey)
208+
}
209+
210+
log.Warn(fmt.Sprint("The following Gaps were found: ", knownErrorBlocks))
211+
log.Warn(fmt.Sprint("Updating known Gaps table from ", startErrorBlock, " to ", endErrorBlock, " with processing key, ", sds.KnownGaps.processingKey))
212+
sds.indexer.PushKnownGaps(startErrorBlock, endErrorBlock, false, sds.KnownGaps.processingKey)
213+
214+
} else {
215+
log.Warn("We missed blocks without any errors.")
216+
// 110 + 1 == 111
217+
startBlock := big.NewInt(0).Add(lastProcessedBlock, sds.KnownGaps.expectedDifference)
218+
// 125 - 1 == 124
219+
endBlock := big.NewInt(0).Sub(currentBlock, sds.KnownGaps.expectedDifference)
220+
log.Warn(fmt.Sprint("Missed blocks starting from: ", startBlock))
221+
log.Warn(fmt.Sprint("Missed blocks ending at: ", endBlock))
222+
sds.indexer.PushKnownGaps(startBlock, endBlock, false, sds.KnownGaps.processingKey)
223+
}
224+
}
225+
138226
// BlockCache caches the last block for safe access from different service loops
139227
type BlockCache struct {
140228
sync.Mutex
@@ -174,6 +262,14 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
174262
if workers == 0 {
175263
workers = 1
176264
}
265+
// If we ever have multiple processingKeys we can update them here
266+
// along with the expectedDifference
267+
knownGaps := &KnownGaps{
268+
checkForGaps: true,
269+
processingKey: 0,
270+
expectedDifference: big.NewInt(1),
271+
errorState: false,
272+
}
177273
sds := &Service{
178274
Mutex: sync.Mutex{},
179275
BlockChain: blockChain,
@@ -184,6 +280,7 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
184280
BlockCache: NewBlockCache(workers),
185281
BackendAPI: backend,
186282
WaitForSync: params.WaitForSync,
283+
KnownGaps: *knownGaps,
187284
indexer: indexer,
188285
enableWriteLoop: params.EnableWriteLoop,
189286
numWorkers: workers,
@@ -308,12 +405,40 @@ func (sds *Service) writeLoopWorker(params workerParams) {
308405
sds.writeGenesisStateDiff(parentBlock, params.id)
309406
}
310407

408+
// If for any reason we need to check for gaps,
409+
// Check and update the gaps table.
410+
if sds.KnownGaps.checkForGaps && !sds.KnownGaps.errorState {
411+
log.Info("Checking for Gaps at current block: ", currentBlock.Number())
412+
go sds.indexer.FindAndUpdateGaps(currentBlock.Number(), sds.KnownGaps.expectedDifference, sds.KnownGaps.processingKey)
413+
sds.KnownGaps.checkForGaps = false
414+
}
415+
311416
log.Info("Writing state diff", "block height", currentBlock.Number().Uint64(), "worker", params.id)
312417
err := sds.writeStateDiffWithRetry(currentBlock, parentBlock.Root(), writeLoopParams)
313418
if err != nil {
314419
log.Error("statediff.Service.WriteLoop: processing error", "block height", currentBlock.Number().Uint64(), "error", err.Error(), "worker", params.id)
420+
sds.KnownGaps.errorState = true
421+
sds.KnownGaps.knownErrorBlocks = append(sds.KnownGaps.knownErrorBlocks, currentBlock.Number())
422+
log.Warn("Updating the following block to knownErrorBlocks to be inserted into knownGaps table: ", currentBlock.Number())
423+
// Write object to startdiff
315424
continue
316425
}
426+
sds.KnownGaps.errorState = false
427+
// Understand what the last block that should have been processed is
428+
previousExpectedBlock := big.NewInt(0).Sub(currentBlock.Number(), sds.KnownGaps.expectedDifference)
429+
// If we last block which should have been processed is not
430+
// the actual lastProcessedBlock, add it to known gaps table.
431+
if previousExpectedBlock != sds.KnownGaps.lastProcessedBlock && sds.KnownGaps.lastProcessedBlock != nil {
432+
// We must pass in parameters by VALUE not reference.
433+
// If we pass them in my reference, the references can change before the computation is complete!
434+
staticKnownErrorBlocks := make([]*big.Int, len(sds.KnownGaps.knownErrorBlocks))
435+
copy(staticKnownErrorBlocks, sds.KnownGaps.knownErrorBlocks)
436+
staticLastProcessedBlock := new(big.Int).Set(sds.KnownGaps.lastProcessedBlock)
437+
go sds.capturedMissedBlocks(currentBlock.Number(), staticKnownErrorBlocks, staticLastProcessedBlock)
438+
sds.KnownGaps.knownErrorBlocks = nil
439+
}
440+
sds.KnownGaps.lastProcessedBlock = currentBlock.Number()
441+
317442
// TODO: how to handle with concurrent workers
318443
statediffMetrics.lastStatediffHeight.Update(int64(currentBlock.Number().Uint64()))
319444
case <-sds.QuitChan:

0 commit comments

Comments
 (0)