Skip to content

Commit 4e56433

Browse files
jottoindanielo
andauthored
Fix new_heads Events Emission on Block Forks (#10072)
TL;DR: on a reorg, the common ancestor block is not being published to subscribers of newHeads #### Expected behavior if the reorg's common ancestor is 2, I expect 2 to be republished 1, 2, **2**, **3**, **4** #### Actual behavior 2 is not republished, and 3's parentHash points to a 2 header that was never received 1, 2, **3**, **4** This PR is the same thing as #9738 except with a test. Note... the test passes, but **this does not actually work in production** (for Ethereum mainnet with prysm as external CL). Why? Because in production, `h.sync.PrevUnwindPoint()` is always nil: https://github.com/ledgerwatch/erigon/blob/a5270bccf5e69a6beaaab9a0663bdad80e989505/turbo/stages/stageloop.go#L291 which means the initial "if block" is never entered, and thus we have **no control** of increment/decrement `notifyFrom` during reorgs https://github.com/ledgerwatch/erigon/blob/a5270bccf5e69a6beaaab9a0663bdad80e989505/eth/stagedsync/stage_finish.go#L137-L146 I don't know why `h.sync.PrevUnwindPoint()` is seemingly always nil, or how the test can pass if it fails in prod. I'm hoping to pass the baton to someone who might. Thank you @indanielo for original fix. If we can figure this bug out, it closes #8848 and closes #9568 and closes #10056 --------- Co-authored-by: Daniel Gimenez <25278291+indanielo@users.noreply.github.com>
1 parent aee77ab commit 4e56433

2 files changed

Lines changed: 54 additions & 9 deletions

File tree

eth/stagedsync/stage_finish.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ func NotifyNewHeaders(ctx context.Context, finishStageBeforeSync uint64, finishS
143143
heightSpan = 1024
144144
}
145145
notifyFrom = finishStageAfterSync - heightSpan
146+
notifyFrom++
146147
}
147-
notifyFrom++
148148

149149
var notifyTo = notifyFrom
150150
var notifyToHash libcommon.Hash

turbo/jsonrpc/eth_subscribe_test.go

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package jsonrpc
22

33
import (
44
"context"
5-
"fmt"
5+
"math/big"
66
"testing"
77

88
"github.com/stretchr/testify/require"
@@ -25,14 +25,22 @@ import (
2525
"github.com/ledgerwatch/erigon/turbo/stages/mock"
2626
)
2727

28-
func TestEthSubscribe(t *testing.T) {
29-
m, require := mock.Mock(t), require.New(t)
30-
chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) {
31-
b.SetCoinbase(libcommon.Address{1})
28+
func sendBlock(t *testing.T, require *require.Assertions, m *mock.MockSentry, chain *core.ChainPack) {
29+
// Send NewBlock message
30+
b, err := rlp.EncodeToBytes(&eth.NewBlockPacket{
31+
Block: chain.TopBlock,
32+
TD: big.NewInt(1), // This is ignored anyway
3233
})
33-
require.NoError(err)
34+
if err != nil {
35+
t.Fatal(err)
36+
}
37+
m.ReceiveWg.Add(1)
38+
for _, err = range m.Send(&sentry.InboundMessage{Id: sentry.MessageId_NEW_BLOCK_66, Data: b, PeerId: m.PeerId}) {
39+
require.NoError(err)
40+
}
3441

35-
b, err := rlp.EncodeToBytes(&eth.BlockHeadersPacket66{
42+
// Send all the headers
43+
b, err = rlp.EncodeToBytes(&eth.BlockHeadersPacket66{
3644
RequestId: 1,
3745
BlockHeadersPacket: chain.Headers,
3846
})
@@ -43,6 +51,16 @@ func TestEthSubscribe(t *testing.T) {
4351
require.NoError(err)
4452
}
4553
m.ReceiveWg.Wait() // Wait for all messages to be processed before we proceeed
54+
}
55+
56+
func TestEthSubscribe(t *testing.T) {
57+
m, require := mock.Mock(t), require.New(t)
58+
chain, err := core.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 7, func(i int, b *core.BlockGen) {
59+
b.SetCoinbase(libcommon.Address{1})
60+
})
61+
require.NoError(err)
62+
63+
sendBlock(t, require, m, chain)
4664

4765
ctx := context.Background()
4866
logger := log.New()
@@ -64,7 +82,34 @@ func TestEthSubscribe(t *testing.T) {
6482

6583
for i := uint64(1); i <= highestSeenHeader; i++ {
6684
header := <-newHeads
67-
fmt.Printf("Got header %d\n", header.Number.Uint64())
6885
require.Equal(i, header.Number.Uint64())
86+
require.Equal(chain.Blocks[i-1].Hash(), header.Hash())
87+
}
88+
89+
// create reorg chain starting with common ancestor of 3, 4 will be first block with different coinbase
90+
m2 := mock.Mock(t)
91+
chain, err = core.GenerateChain(m2.ChainConfig, m2.Genesis, m2.Engine, m2.DB, 9, func(i int, b *core.BlockGen) {
92+
// i starts from 0, so this means everything under block 4 will have coinbase 1, and 4 and above will have coinbase 2
93+
if i < 3 {
94+
b.SetCoinbase(libcommon.Address{1})
95+
} else {
96+
b.SetCoinbase(libcommon.Address{2})
97+
}
98+
})
99+
require.NoError(err)
100+
101+
sendBlock(t, require, m, chain)
102+
103+
if err := stages.StageLoopIteration(m.Ctx, m.DB, wrap.TxContainer{}, m.Sync, initialCycle, logger, m.BlockReader, hook, false); err != nil {
104+
t.Fatal(err)
105+
}
106+
107+
highestSeenHeader = chain.TopBlock.NumberU64()
108+
109+
// since common ancestor of reorg is 3 the first new header we will see should be 3
110+
for i := uint64(3); i <= highestSeenHeader; i++ {
111+
header := <-newHeads
112+
require.Equal(i, header.Number.Uint64())
113+
require.Equal(chain.Blocks[i-1].Hash(), header.Hash())
69114
}
70115
}

0 commit comments

Comments
 (0)