Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/rawdb/accessors_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,14 @@ func WriteTxIndexTail(db ethdb.KeyValueWriter, number uint64) {
}
}

// DeleteTxIndexTail deletes the number of oldest indexed block
// from database.
func DeleteTxIndexTail(db ethdb.KeyValueWriter) {
if err := db.Delete(txIndexTailKey); err != nil {
log.Crit("Failed to delete the transaction index tail", "err", err)
}
}

// ReadHeaderRange returns the rlp-encoded headers, starting at 'number', and going
// backwards towards genesis. This method assumes that the caller already has
// placed a cap on count, to prevent DoS issues.
Expand Down
48 changes: 40 additions & 8 deletions core/rawdb/accessors_indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,8 @@ import (
"github.com/ethereum/go-ethereum/rlp"
)

// ReadTxLookupEntry retrieves the positional metadata associated with a transaction
// hash to allow retrieving the transaction or receipt by hash.
func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
data, _ := db.Get(txLookupKey(hash))
if len(data) == 0 {
return nil
}
// DecodeTxLookupEntry decodes the supplied tx lookup data.
func DecodeTxLookupEntry(data []byte, db ethdb.Reader) *uint64 {
// Database v6 tx lookup just stores the block number
if len(data) < common.HashLength {
number := new(big.Int).SetBytes(data).Uint64()
Expand All @@ -49,12 +44,22 @@ func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
// Finally try database v3 tx lookup format
var entry LegacyTxLookupEntry
if err := rlp.DecodeBytes(data, &entry); err != nil {
log.Error("Invalid transaction lookup entry RLP", "hash", hash, "blob", data, "err", err)
log.Error("Invalid transaction lookup entry RLP", "blob", data, "err", err)
return nil
}
return &entry.BlockIndex
}

// ReadTxLookupEntry retrieves the positional metadata associated with a transaction
// hash to allow retrieving the transaction or receipt by hash.
func ReadTxLookupEntry(db ethdb.Reader, hash common.Hash) *uint64 {
data, _ := db.Get(txLookupKey(hash))
if len(data) == 0 {
return nil
}
return DecodeTxLookupEntry(data, db)
}

// writeTxLookupEntry stores a positional metadata for a transaction,
// enabling hash based transaction and receipt lookups.
func writeTxLookupEntry(db ethdb.KeyValueWriter, hash common.Hash, numberBytes []byte) {
Expand Down Expand Up @@ -95,6 +100,33 @@ func DeleteTxLookupEntries(db ethdb.KeyValueWriter, hashes []common.Hash) {
}
}

// DeleteAllTxLookupEntries purges all the transaction indexes in the database.
// If condition is specified, only the entry with condition as True will be
// removed; If condition is not specified, the entry is deleted.
func DeleteAllTxLookupEntries(db ethdb.KeyValueStore, condition func([]byte) bool) {
iter := NewKeyLengthIterator(db.NewIterator(txLookupPrefix, nil), common.HashLength+len(txLookupPrefix))
defer iter.Release()

batch := db.NewBatch()
for iter.Next() {
if condition == nil || condition(iter.Value()) {
batch.Delete(iter.Key())
}
if batch.ValueSize() >= ethdb.IdealBatchSize {
if err := batch.Write(); err != nil {
log.Crit("Failed to delete transaction lookup entries", "err", err)
}
batch.Reset()
}
}
if batch.ValueSize() > 0 {
if err := batch.Write(); err != nil {
log.Crit("Failed to delete transaction lookup entries", "err", err)
}
batch.Reset()
}
}

// ReadTransaction retrieves a specific transaction from the database, along with
// its added positional metadata.
func ReadTransaction(db ethdb.Reader, hash common.Hash) (*types.Transaction, common.Hash, uint64, uint64) {
Expand Down
151 changes: 120 additions & 31 deletions core/txindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ type txIndexer struct {
// * 0: means the entire chain should be indexed
// * N: means the latest N blocks [HEAD-N+1, HEAD] should be indexed
// and all others shouldn't.
limit uint64
limit uint64

// cutoff denotes the block number before which the chain segment should
// be pruned and not available locally.
cutoff uint64
db ethdb.Database
progress chan chan TxIndexProgress
term chan chan struct{}
Expand All @@ -55,6 +59,7 @@ type txIndexer struct {
func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
indexer := &txIndexer{
limit: limit,
cutoff: chain.HistoryPruningCutoff(),
db: chain.db,
progress: make(chan chan TxIndexProgress),
term: make(chan chan struct{}),
Expand All @@ -64,7 +69,11 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {

var msg string
if limit == 0 {
msg = "entire chain"
if indexer.cutoff == 0 {
msg = "entire chain"
} else {
msg = fmt.Sprintf("blocks since #%d", indexer.cutoff)
}
} else {
msg = fmt.Sprintf("last %d blocks", limit)
}
Expand All @@ -74,49 +83,114 @@ func newTxIndexer(limit uint64, chain *BlockChain) *txIndexer {
}

// run executes the scheduled indexing/unindexing task in a separate thread.
// If the stop channel is closed, the task should be terminated as soon as
// possible, the done channel will be closed once the task is finished.
func (indexer *txIndexer) run(tail *uint64, head uint64, stop chan struct{}, done chan struct{}) {
// If the stop channel is closed, the task should terminate as soon as possible.
// The done channel will be closed once the task is complete.
//
// Existing transaction indexes are assumed to be valid, with both the head
// and tail above the configured cutoff.
func (indexer *txIndexer) run(head uint64, stop chan struct{}, done chan struct{}) {
defer func() { close(done) }()

// Short circuit if chain is empty and nothing to index.
if head == 0 {
// Short circuit if the chain is either empty, or entirely below the
// cutoff point.
if head == 0 || head < indexer.cutoff {
return
}
// The tail flag is not existent, it means the node is just initialized
// and all blocks in the chain (part of them may from ancient store) are
// not indexed yet, index the chain according to the configured limit.
tail := rawdb.ReadTxIndexTail(indexer.db)
if tail == nil {
// Determine the first block for transaction indexing, taking the
// configured cutoff point into account.
from := uint64(0)
if indexer.limit != 0 && head >= indexer.limit {
from = head - indexer.limit + 1
}
from = max(from, indexer.cutoff)
rawdb.IndexTransactions(indexer.db, from, head+1, stop, true)
return
}
// The tail flag is existent (which means indexes in [tail, head] should be
// present), while the whole chain are requested for indexing.
if indexer.limit == 0 || head < indexer.limit {
if *tail > 0 {
// It can happen when chain is rewound to a historical point which
// is even lower than the indexes tail, recap the indexing target
// to new head to avoid reading non-existent block bodies.
end := *tail
if end > head+1 {
end = head + 1
}
rawdb.IndexTransactions(indexer.db, 0, end, stop, true)
from := max(uint64(0), indexer.cutoff)
rawdb.IndexTransactions(indexer.db, from, *tail, stop, true)
}
return
}
// The tail flag is existent, adjust the index range according to configured
// limit and the latest chain head.
if head-indexer.limit+1 < *tail {
from := head - indexer.limit + 1
from = max(from, indexer.cutoff)
if from < *tail {
// Reindex a part of missing indices and rewind index tail to HEAD-limit
rawdb.IndexTransactions(indexer.db, head-indexer.limit+1, *tail, stop, true)
rawdb.IndexTransactions(indexer.db, from, *tail, stop, true)
} else {
// Unindex a part of stale indices and forward index tail to HEAD-limit
rawdb.UnindexTransactions(indexer.db, *tail, head-indexer.limit+1, stop, false)
rawdb.UnindexTransactions(indexer.db, *tail, from, stop, false)
}
}

// repair ensures that transaction indexes are in a valid state and invalidates
// them if they are not. The following cases are considered invalid:
// * The index tail is higher than the chain head.
// * The chain head is below the configured cutoff, but the index tail is not empty.
// * The index tail is below the configured cutoff, but it is not empty.
func (indexer *txIndexer) repair(head uint64) {
// If the transactions haven't been indexed yet, nothing to repair
tail := rawdb.ReadTxIndexTail(indexer.db)
if tail == nil {
return
}
// The transaction index tail is higher than the chain head, which may occur
// when the chain is rewound to a historical height below the index tail.
// Purge the transaction indexes from the database. **It's not a common case
// to rewind the chain head below the index tail**.
if *tail > head {
// A crash may occur between the two delete operations,
// potentially leaving dangling indexes in the database.
// However, this is considered acceptable.
rawdb.DeleteTxIndexTail(indexer.db)
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
log.Warn("Purge transaction indexes", "head", head, "tail", *tail)
return
}

// If the entire chain is below the configured cutoff point,
// removing the tail of transaction indexing and purges the
// transaction indexes. **It's not a common case, as the cutoff
// is usually defined below the chain head**.
if head < indexer.cutoff {
// A crash may occur between the two delete operations,
// potentially leaving dangling indexes in the database.
// However, this is considered acceptable.
//
// The leftover indexes can't be unindexed by scanning
// the blocks as they are not guaranteed to be available.
// Traversing the database directly within the transaction
// index namespace might be slow and expensive, but we
// have no choice.
rawdb.DeleteTxIndexTail(indexer.db)
rawdb.DeleteAllTxLookupEntries(indexer.db, nil)
log.Warn("Purge transaction indexes", "head", head, "cutoff", indexer.cutoff)
return
}

// The chain head is above the cutoff while the tail is below the
// cutoff. Shift the tail to the cutoff point and remove the indexes
// below.
if *tail < indexer.cutoff {
// A crash may occur between the two delete operations,
// potentially leaving dangling indexes in the database.
// However, this is considered acceptable.
rawdb.WriteTxIndexTail(indexer.db, indexer.cutoff)
rawdb.DeleteAllTxLookupEntries(indexer.db, func(blob []byte) bool {
n := rawdb.DecodeTxLookupEntry(blob, indexer.db)
return n != nil && *n < indexer.cutoff
})
log.Warn("Purge transaction indexes below cutoff", "tail", *tail, "cutoff", indexer.cutoff)
}
}

Expand All @@ -127,39 +201,39 @@ func (indexer *txIndexer) loop(chain *BlockChain) {

// Listening to chain events and manipulate the transaction indexes.
var (
stop chan struct{} // Non-nil if background routine is active.
done chan struct{} // Non-nil if background routine is active.
lastHead uint64 // The latest announced chain head (whose tx indexes are assumed created)
lastTail = rawdb.ReadTxIndexTail(indexer.db) // The oldest indexed block, nil means nothing indexed
stop chan struct{} // Non-nil if background routine is active
done chan struct{} // Non-nil if background routine is active
head = rawdb.ReadHeadBlock(indexer.db).NumberU64() // The latest announced chain head

headCh = make(chan ChainHeadEvent)
sub = chain.SubscribeChainHeadEvent(headCh)
)
defer sub.Unsubscribe()

// Validate the transaction indexes and repair if necessary
indexer.repair(head)

// Launch the initial processing if chain is not empty (head != genesis).
// This step is useful in these scenarios that chain has no progress.
if head := rawdb.ReadHeadBlock(indexer.db); head != nil && head.Number().Uint64() != 0 {
if head != 0 {
stop = make(chan struct{})
done = make(chan struct{})
lastHead = head.Number().Uint64()
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.NumberU64(), stop, done)
go indexer.run(head, stop, done)
}
for {
select {
case head := <-headCh:
case h := <-headCh:
if done == nil {
stop = make(chan struct{})
done = make(chan struct{})
go indexer.run(rawdb.ReadTxIndexTail(indexer.db), head.Header.Number.Uint64(), stop, done)
go indexer.run(h.Header.Number.Uint64(), stop, done)
}
lastHead = head.Header.Number.Uint64()
head = h.Header.Number.Uint64()
case <-done:
stop = nil
done = nil
lastTail = rawdb.ReadTxIndexTail(indexer.db)
case ch := <-indexer.progress:
ch <- indexer.report(lastHead, lastTail)
ch <- indexer.report(head)
case ch := <-indexer.term:
if stop != nil {
close(stop)
Expand All @@ -175,12 +249,27 @@ func (indexer *txIndexer) loop(chain *BlockChain) {
}

// report returns the tx indexing progress.
func (indexer *txIndexer) report(head uint64, tail *uint64) TxIndexProgress {
func (indexer *txIndexer) report(head uint64) TxIndexProgress {
// Special case if the head is even below the cutoff,
// nothing to index.
if head < indexer.cutoff {
Copy link

@gulatinavneet gulatinavneet Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check for head == 0 too here because if someone calls some json rpc api methods of getting receipt on an empty blockchain, they will get error txn indexing is in progress which is a bit misleading because there is no indexing on genesis block @s1na

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you seen this error in practice?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm yes, I have observed it when writing unit tests with a chain that starts from genesis. When I commit the first transaction to the chain, and wait for its receipt, I keep on getting error txn indexing is in progress and it keeps on going until the first block is committed. After that getting transaction receipt is smooth and fast

return TxIndexProgress{
Indexed: 0,
Remaining: 0,
}
}
// Compute how many blocks are supposed to be indexed
total := indexer.limit
if indexer.limit == 0 || total > head {
total = head + 1 // genesis included
}
length := head - indexer.cutoff + 1 // all available chain for indexing
if total > length {
total = length
}
// Compute how many blocks have been indexed
var indexed uint64
tail := rawdb.ReadTxIndexTail(indexer.db)
if tail != nil {
indexed = head - *tail + 1
}
Expand Down
Loading