Skip to content

Commit b0b74cf

Browse files
s1narjl493456442fjl
authored andcommitted
core: respect history cutoff in txindexer (ethereum#31393)
In ethereum#31384 we unindex TXes prior to the merge block. However when the node starts up it will try to re-index those back if the config is to index the whole chain. This change makes the indexer aware of the history cutoff block, avoiding reindexing in that segment. --------- Co-authored-by: Gary Rong <[email protected]> Co-authored-by: Felix Lange <[email protected]>
1 parent 93bf722 commit b0b74cf

File tree

4 files changed

+541
-200
lines changed

4 files changed

+541
-200
lines changed

core/rawdb/accessors_chain.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,14 @@ func WriteTxIndexTail(db ethdb.KeyValueWriter, number uint64) {
277277
}
278278
}
279279

280+
// DeleteTxIndexTail deletes the number of oldest indexed block
281+
// from database.
282+
func DeleteTxIndexTail(db ethdb.KeyValueWriter) {
283+
if err := db.Delete(txIndexTailKey); err != nil {
284+
log.Crit("Failed to delete the transaction index tail", "err", err)
285+
}
286+
}
287+
280288
// ReadHeaderRange returns the rlp-encoded headers, starting at 'number', and going
281289
// backwards towards genesis. This method assumes that the caller already has
282290
// placed a cap on count, to prevent DoS issues.

core/rawdb/accessors_indexes.go

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,8 @@ import (
3030
"github.com/ethereum/go-ethereum/rlp"
3131
)
3232

33-
// ReadTxLookupEntry retrieves the positional metadata associated with a transaction
34-
// hash to allow retrieving the transaction or receipt by hash.
35-
func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
36-
data, _ := db.Get(txLookupKey(hash))
37-
if len(data) == 0 {
38-
return nil
39-
}
33+
// DecodeTxLookupEntry decodes the supplied tx lookup data.
34+
func DecodeTxLookupEntry(data []byte, db ethdb.Reader) *uint64 {
4035
// Database v6 tx lookup just stores the block number
4136
if len(data) < common.HashLength {
4237
number := new(big.Int).SetBytes(data).Uint64()
@@ -49,12 +44,22 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
4944
// Finally try database v3 tx lookup format
5045
var entry LegacyTxLookupEntry
5146
if err := rlp.DecodeBytes(data, &entry); err != nil {
52-
log.Error("Invalid transaction lookup entry RLP", "hash", hash, "blob", data, "err", err)
47+
log.Error("Invalid transaction lookup entry RLP", "blob", data, "err", err)
5348
return nil
5449
}
5550
return &entry.BlockIndex
5651
}
5752

53+
// ReadTxLookupEntry retrieves the positional metadata associated with a transaction
54+
// hash to allow retrieving the transaction or receipt by hash.
55+
func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
56+
data, _ := db.Get(txLookupKey(hash))
57+
if len(data) == 0 {
58+
return nil
59+
}
60+
return DecodeTxLookupEntry(data, db)
61+
}
62+
5863
// writeTxLookupEntry stores a positional metadata for a transaction,
5964
// enabling hash based transaction and receipt lookups.
6065
func writeTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash, numberBytes []byte) {
@@ -95,6 +100,33 @@ func DeleteTxLookupEntries(db ethdb.KeyValueWriter, hashes []common.Hash) {
95100
}
96101
}
97102

103+
// DeleteAllTxLookupEntries purges all the transaction indexes in the database.
104+
// If condition is specified, only the entry with condition as True will be
105+
// removed; If condition is not specified, the entry is deleted.
106+
func DeleteAllTxLookupEntries(db ethdb.KeyValueStore, condition func([]byte) bool) {
107+
iter := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, nil), common.HashLength+len(txLookupPrefix))
108+
defer iter.Release()
109+
110+
batch := db.NewBatch()
111+
for iter.Next() {
112+
if condition == nil || condition(iter.Value()) {
113+
batch.Delete(iter.Key())
114+
}
115+
if batch.ValueSize() >= ethdb.IdealBatchSize {
116+
if err := batch.Write(); err != nil {
117+
log.Crit("Failed to delete transaction lookup entries", "err", err)
118+
}
119+
batch.Reset()
120+
}
121+
}
122+
if batch.ValueSize() > 0 {
123+
if err := batch.Write(); err != nil {
124+
log.Crit("Failed to delete transaction lookup entries", "err", err)
125+
}
126+
batch.Reset()
127+
}
128+
}
129+
98130
// ReadTransaction retrieves a specific transaction from the database, along with
99131
// its added positional metadata.
100132
func ReadTransaction(db ethdb.Reader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) {

core/txindexer.go

Lines changed: 120 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ type txIndexer struct {
4444
// * 0: means the entire chain should be indexed
4545
// * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed
4646
// and all others shouldn't.
47-
limit uint64
47+
limit uint64
48+
49+
// cutoff denotes the block number before which the chain segment should
50+
// be pruned and not available locally.
51+
cutoff uint64
4852
db ethdb.Database
4953
progress chan chan TxIndexProgress
5054
term chan chan struct{}
@@ -55,6 +59,7 @@ type txIndexer struct {
5559
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
5660
indexer := &txIndexer{
5761
limit: limit,
62+
cutoff: chain.HistoryPruningCutoff(),
5863
db: chain.db,
5964
progress: make(chan chan TxIndexProgress),
6065
term: make(chan chan struct{}),
@@ -64,7 +69,11 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
6469

6570
var msg string
6671
if limit == 0 {
67-
msg = "entire chain"
72+
if indexer.cutoff == 0 {
73+
msg = "entire chain"
74+
} else {
75+
msg = fmt.Sprintf("blocks since #%d", indexer.cutoff)
76+
}
6877
} else {
6978
msg = fmt.Sprintf("last %d blocks", limit)
7079
}
@@ -74,49 +83,114 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
7483
}
7584

7685
// run executes the scheduled indexing/unindexing task in a separate thread.
77-
// If the stop channel is closed, the task should be terminated as soon as
78-
// possible, the done channel will be closed once the task is finished.
79-
func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) {
86+
// If the stop channel is closed, the task should terminate as soon as possible.
87+
// The done channel will be closed once the task is complete.
88+
//
89+
// Existing transaction indexes are assumed to be valid, with both the head
90+
// and tail above the configured cutoff.
91+
func (indexer *txIndexer) run(head uint64, stop chan struct{}, done chan struct{}) {
8092
defer func() { close(done) }()
8193

82-
// Short circuit if chain is empty and nothing to index.
83-
if head == 0 {
94+
// Short circuit if the chain is either empty, or entirely below the
95+
// cutoff point.
96+
if head == 0 || head < indexer.cutoff {
8497
return
8598
}
8699
// The tail flag is not existent, it means the node is just initialized
87100
// and all blocks in the chain (part of them may from ancient store) are
88101
// not indexed yet, index the chain according to the configured limit.
102+
tail := rawdb.ReadTxIndexTail(indexer.db)
89103
if tail == nil {
104+
// Determine the first block for transaction indexing, taking the
105+
// configured cutoff point into account.
90106
from := uint64(0)
91107
if indexer.limit != 0 && head >= indexer.limit {
92108
from = head - indexer.limit + 1
93109
}
110+
from = max(from, indexer.cutoff)
94111
rawdb.IndexTransactions(indexer.db, from, head+1, stop, true)
95112
return
96113
}
97114
// The tail flag is existent (which means indexes in [tail, head] should be
98115
// present), while the whole chain are requested for indexing.
99116
if indexer.limit == 0 || head < indexer.limit {
100117
if *tail > 0 {
101-
// It can happen when chain is rewound to a historical point which
102-
// is even lower than the indexes tail, recap the indexing target
103-
// to new head to avoid reading non-existent block bodies.
104-
end := *tail
105-
if end > head+1 {
106-
end = head + 1
107-
}
108-
rawdb.IndexTransactions(indexer.db, 0, end, stop, true)
118+
from := max(uint64(0), indexer.cutoff)
119+
rawdb.IndexTransactions(indexer.db, from, *tail, stop, true)
109120
}
110121
return
111122
}
112123
// The tail flag is existent, adjust the index range according to configured
113124
// limit and the latest chain head.
114-
if head-indexer.limit+1 < *tail {
125+
from := head - indexer.limit + 1
126+
from = max(from, indexer.cutoff)
127+
if from < *tail {
115128
// Reindex a part of missing indices and rewind index tail to HEAD-limit
116-
rawdb.IndexTransactions(indexer.db, head-indexer.limit+1, *tail, stop, true)
129+
rawdb.IndexTransactions(indexer.db, from, *tail, stop, true)
117130
} else {
118131
// Unindex a part of stale indices and forward index tail to HEAD-limit
119-
rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false)
132+
rawdb.UnindexTransactions(indexer.db, *tail, from, stop, false)
133+
}
134+
}
135+
136+
// repair ensures that transaction indexes are in a valid state and invalidates
137+
// them if they are not. The following cases are considered invalid:
138+
// * The index tail is higher than the chain head.
139+
// * The chain head is below the configured cutoff, but the index tail is not empty.
140+
// * The index tail is below the configured cutoff, but it is not empty.
141+
func (indexer *txIndexer) repair(head uint64) {
142+
// If the transactions haven't been indexed yet, nothing to repair
143+
tail := rawdb.ReadTxIndexTail(indexer.db)
144+
if tail == nil {
145+
return
146+
}
147+
// The transaction index tail is higher than the chain head, which may occur
148+
// when the chain is rewound to a historical height below the index tail.
149+
// Purge the transaction indexes from the database. **It's not a common case
150+
// to rewind the chain head below the index tail**.
151+
if *tail > head {
152+
// A crash may occur between the two delete operations,
153+
// potentially leaving dangling indexes in the database.
154+
// However, this is considered acceptable.
155+
rawdb.DeleteTxIndexTail(indexer.db)
156+
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
157+
log.Warn("Purge transaction indexes", "head", head, "tail", *tail)
158+
return
159+
}
160+
161+
// If the entire chain is below the configured cutoff point,
162+
// removing the tail of transaction indexing and purges the
163+
// transaction indexes. **It's not a common case, as the cutoff
164+
// is usually defined below the chain head**.
165+
if head < indexer.cutoff {
166+
// A crash may occur between the two delete operations,
167+
// potentially leaving dangling indexes in the database.
168+
// However, this is considered acceptable.
169+
//
170+
// The leftover indexes can't be unindexed by scanning
171+
// the blocks as they are not guaranteed to be available.
172+
// Traversing the database directly within the transaction
173+
// index namespace might be slow and expensive, but we
174+
// have no choice.
175+
rawdb.DeleteTxIndexTail(indexer.db)
176+
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
177+
log.Warn("Purge transaction indexes", "head", head, "cutoff", indexer.cutoff)
178+
return
179+
}
180+
181+
// The chain head is above the cutoff while the tail is below the
182+
// cutoff. Shift the tail to the cutoff point and remove the indexes
183+
// below.
184+
if *tail < indexer.cutoff {
185+
// A crash may occur between the two delete operations,
186+
// potentially leaving dangling indexes in the database.
187+
// However, this is considered acceptable.
188+
rawdb.WriteTxIndexTail(indexer.db, indexer.cutoff)
189+
rawdb.DeleteAllTxLookupEntries(indexer.db, func(blob []byte) bool {
190+
n := rawdb.DecodeTxLookupEntry(blob, indexer.db)
191+
return n != nil && *n < indexer.cutoff
192+
})
193+
log.Warn("Purge transaction indexes below cutoff", "tail", *tail, "cutoff", indexer.cutoff)
120194
}
121195
}
122196

@@ -127,39 +201,39 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
127201

128202
// Listening to chain events and manipulate the transaction indexes.
129203
var (
130-
stop chan struct{} // Non-nil if background routine is active.
131-
done chan struct{} // Non-nil if background routine is active.
132-
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
133-
lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed
204+
stop chan struct{} // Non-nil if background routine is active
205+
done chan struct{} // Non-nil if background routine is active
206+
head = rawdb.ReadHeadBlock(indexer.db).NumberU64() // The latest announced chain head
134207

135208
headCh = make(chan ChainHeadEvent)
136209
sub = chain.SubscribeChainHeadEvent(headCh)
137210
)
138211
defer sub.Unsubscribe()
139212

213+
// Validate the transaction indexes and repair if necessary
214+
indexer.repair(head)
215+
140216
// Launch the initial processing if chain is not empty (head != genesis).
141217
// This step is useful in these scenarios that chain has no progress.
142-
if head := rawdb.ReadHeadBlock(indexer.db); head != nil && head.Number().Uint64() != 0 {
218+
if head != 0 {
143219
stop = make(chan struct{})
144220
done = make(chan struct{})
145-
lastHead = head.Number().Uint64()
146-
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.NumberU64(), stop, done)
221+
go indexer.run(head, stop, done)
147222
}
148223
for {
149224
select {
150-
case head := <-headCh:
225+
case h := <-headCh:
151226
if done == nil {
152227
stop = make(chan struct{})
153228
done = make(chan struct{})
154-
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Header.Number.Uint64(), stop, done)
229+
go indexer.run(h.Header.Number.Uint64(), stop, done)
155230
}
156-
lastHead = head.Header.Number.Uint64()
231+
head = h.Header.Number.Uint64()
157232
case <-done:
158233
stop = nil
159234
done = nil
160-
lastTail = rawdb.ReadTxIndexTail(indexer.db)
161235
case ch := <-indexer.progress:
162-
ch <- indexer.report(lastHead, lastTail)
236+
ch <- indexer.report(head)
163237
case ch := <-indexer.term:
164238
if stop != nil {
165239
close(stop)
@@ -175,12 +249,27 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
175249
}
176250

177251
// report returns the tx indexing progress.
178-
func (indexer *txIndexer) report(head uint64, tail *uint64) TxIndexProgress {
252+
func (indexer *txIndexer) report(head uint64) TxIndexProgress {
253+
// Special case if the head is even below the cutoff,
254+
// nothing to index.
255+
if head < indexer.cutoff {
256+
return TxIndexProgress{
257+
Indexed: 0,
258+
Remaining: 0,
259+
}
260+
}
261+
// Compute how many blocks are supposed to be indexed
179262
total := indexer.limit
180263
if indexer.limit == 0 || total > head {
181264
total = head + 1 // genesis included
182265
}
266+
length := head - indexer.cutoff + 1 // all available chain for indexing
267+
if total > length {
268+
total = length
269+
}
270+
// Compute how many blocks have been indexed
183271
var indexed uint64
272+
tail := rawdb.ReadTxIndexTail(indexer.db)
184273
if tail != nil {
185274
indexed = head - *tail + 1
186275
}

0 commit comments

Comments
 (0)