diff --git a/polygon/sync/db.go b/polygon/sync/db.go new file mode 100644 index 00000000000..560ab2bc1dd --- /dev/null +++ b/polygon/sync/db.go @@ -0,0 +1,8 @@ +package sync + +import "github.com/ledgerwatch/erigon/core/types" + +//go:generate mockgen -destination=./mock/db_mock.go -package=mock . DB +type DB interface { + WriteHeaders(headers []*types.Header) error +} diff --git a/polygon/sync/header_downloader.go b/polygon/sync/header_downloader.go new file mode 100644 index 00000000000..76a8f29f33a --- /dev/null +++ b/polygon/sync/header_downloader.go @@ -0,0 +1,214 @@ +package sync + +import ( + "context" + "fmt" + "math" + "sort" + "sync" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/ledgerwatch/log/v3" + + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/polygon/sync/peerinfo" +) + +const headerDownloaderLogPrefix = "HeaderDownloader" + +func NewHeaderDownloader(logger log.Logger, sentry Sentry, db DB, heimdall Heimdall, verify HeaderVerifier) *HeaderDownloader { + statePointHeadersMemo, err := lru.New[common.Hash, []*types.Header](sentry.MaxPeers()) + if err != nil { + panic(err) + } + + return &HeaderDownloader{ + logger: logger, + sentry: sentry, + db: db, + heimdall: heimdall, + verify: verify, + statePointHeadersMemo: statePointHeadersMemo, + } +} + +type HeaderDownloader struct { + logger log.Logger + sentry Sentry + db DB + heimdall Heimdall + verify HeaderVerifier + statePointHeadersMemo *lru.Cache[common.Hash, []*types.Header] // statePoint.rootHash->[headers part of state point] +} + +func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, start uint64) error { + checkpoints, err := hd.heimdall.FetchCheckpoints(ctx, start) + if err != nil { + return err + } + + err = hd.downloadUsingStatePoints(ctx, statePointsFromCheckpoints(checkpoints)) + if err != nil { + return err + } + + return nil +} + +func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, start uint64) error { + milestones, err := hd.heimdall.FetchMilestones(ctx, start) + if err != nil { + return err + } + + err = hd.downloadUsingStatePoints(ctx, statePointsFromMilestones(milestones)) + if err != nil { + return err + } + + return nil +} + +func (hd *HeaderDownloader) downloadUsingStatePoints(ctx context.Context, statePoints statePoints) error { + for len(statePoints) > 0 { + allPeers := hd.sentry.PeersWithBlockNumInfo() + if len(allPeers) == 0 { + hd.logger.Warn(fmt.Sprintf("[%s] zero peers, will try again", headerDownloaderLogPrefix)) + continue + } + + sort.Sort(allPeers) // sort by block num in asc order + peers := hd.choosePeers(allPeers, statePoints) + if len(peers) == 0 { + hd.logger.Warn( + fmt.Sprintf("[%s] can't use any peers to sync, will try again", headerDownloaderLogPrefix), + "start", statePoints[0].startBlock, + "end", statePoints[len(statePoints)-1].endBlock, + "minPeerBlockNum", allPeers[0].BlockNum, + "minPeerID", allPeers[0].ID, + ) + continue + } + + peerCount := len(peers) + statePointsBatch := statePoints[:peerCount] + hd.logger.Info( + fmt.Sprintf("[%s] downloading headers", headerDownloaderLogPrefix), + "start", statePointsBatch[0].startBlock, + "end", statePointsBatch[len(statePointsBatch)-1].endBlock, + "kind", statePointsBatch[0].kind, + "peerCount", peerCount, + ) + + headerBatches := make([][]*types.Header, len(statePointsBatch)) + maxStatePointLength := float64(0) + wg := sync.WaitGroup{} + for i, point := range statePointsBatch { + maxStatePointLength = math.Max(float64(point.length()), maxStatePointLength) + wg.Add(1) + go func(i int, statePoint *statePoint, peerID string) { + defer wg.Done() + + if headers, ok := hd.statePointHeadersMemo.Get(statePoint.rootHash); ok { + headerBatches[i] = headers + return + } + + headers, err := hd.sentry.DownloadHeaders(ctx, statePoint.startBlock, statePoint.endBlock, peerID) + if err != nil { + hd.logger.Debug( + fmt.Sprintf("[%s] issue downloading headers, will try again", headerDownloaderLogPrefix), + "err", err, + "start", statePoint.startBlock, + "end", statePoint.endBlock, + "rootHash", statePoint.rootHash, + "kind", statePoint.kind, + "peerID", peerID, + ) + return + } + + if err := hd.verify(statePoint, headers); err != nil { + hd.logger.Debug( + fmt.Sprintf( + "[%s] bad headers received from peer for state point - penalizing and will try again", + headerDownloaderLogPrefix, + ), + "start", statePoint.startBlock, + "end", statePoint.endBlock, + "rootHash", statePoint.rootHash, + "kind", statePoint.kind, + "peerID", peerID, + ) + + hd.sentry.Penalize(peerID) + return + } + + hd.statePointHeadersMemo.Add(statePoint.rootHash, headers) + headerBatches[i] = headers + }(i, point, peers[i].ID) + } + + wg.Wait() + headers := make([]*types.Header, 0, int(maxStatePointLength)*peerCount) + gapIndex := -1 + for i, headerBatch := range headerBatches { + if len(headerBatch) == 0 { + hd.logger.Debug( + fmt.Sprintf("[%s] no headers, will try again", headerDownloaderLogPrefix), + "start", statePointsBatch[i].startBlock, + "end", statePointsBatch[i].endBlock, + "rootHash", statePointsBatch[i].rootHash, + "kind", statePointsBatch[i].kind, + ) + + gapIndex = i + break + } + + headers = append(headers, headerBatch...) + } + + if gapIndex >= 0 { + statePoints = statePoints[gapIndex:] + } else { + statePoints = statePoints[len(statePointsBatch):] + } + + dbWriteStartTime := time.Now() + if err := hd.db.WriteHeaders(headers); err != nil { + return err + } + + hd.logger.Debug( + fmt.Sprintf("[%s] wrote headers to db", headerDownloaderLogPrefix), + "numHeaders", len(headers), + "time", time.Since(dbWriteStartTime), + ) + } + + return nil +} + +// choosePeers assumes peers are sorted in ascending order based on block num +func (hd *HeaderDownloader) choosePeers(peers peerinfo.PeersWithBlockNumInfo, statePoints statePoints) peerinfo.PeersWithBlockNumInfo { + var peersIdx int + chosenPeers := make(peerinfo.PeersWithBlockNumInfo, 0, len(peers)) + for _, statePoint := range statePoints { + if peersIdx >= len(peers) { + break + } + + peer := peers[peersIdx] + if peer.BlockNum.Cmp(statePoint.endBlock) > -1 { + chosenPeers = append(chosenPeers, peer) + } + + peersIdx++ + } + + return chosenPeers +} diff --git a/polygon/sync/header_downloader_test.go b/polygon/sync/header_downloader_test.go new file mode 100644 index 00000000000..f60ef0c6557 --- /dev/null +++ b/polygon/sync/header_downloader_test.go @@ -0,0 +1,284 @@ +package sync + +import ( + "context" + "errors" + "fmt" + "math" + "math/big" + "testing" + + "github.com/golang/mock/gomock" + "github.com/ledgerwatch/log/v3" + "github.com/stretchr/testify/require" + + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/consensus/bor/heimdall/checkpoint" + "github.com/ledgerwatch/erigon/consensus/bor/heimdall/milestone" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/polygon/sync/mock" + "github.com/ledgerwatch/erigon/polygon/sync/peerinfo" + "github.com/ledgerwatch/erigon/turbo/testlog" +) + +func newHeaderDownloaderTest(t *testing.T) *headerDownloaderTest { + return newHeaderDownloaderTestWithOpts(t, headerDownloaderTestOpts{}) +} + +func newHeaderDownloaderTestWithOpts(t *testing.T, opts headerDownloaderTestOpts) *headerDownloaderTest { + ctrl := gomock.NewController(t) + heimdall := mock.NewMockHeimdall(ctrl) + sentry := mock.NewMockSentry(ctrl) + sentry.EXPECT().MaxPeers().Return(100).Times(1) + db := mock.NewMockDB(ctrl) + logger := testlog.Logger(t, log.LvlDebug) + headerVerifier := opts.getOrCreateDefaultHeaderVerifier() + headerDownloader := NewHeaderDownloader(logger, sentry, db, heimdall, headerVerifier) + return &headerDownloaderTest{ + heimdall: heimdall, + sentry: sentry, + db: db, + headerDownloader: headerDownloader, + } +} + +type headerDownloaderTestOpts struct { + headerVerifier HeaderVerifier +} + +func (opts headerDownloaderTestOpts) getOrCreateDefaultHeaderVerifier() HeaderVerifier { + if opts.headerVerifier == nil { + return func(_ *statePoint, _ []*types.Header) error { + return nil + } + } + + return opts.headerVerifier +} + +type headerDownloaderTest struct { + heimdall *mock.MockHeimdall + sentry *mock.MockSentry + db *mock.MockDB + headerDownloader *HeaderDownloader +} + +func (hdt headerDownloaderTest) fakePeers(count int, blockNums ...*big.Int) peerinfo.PeersWithBlockNumInfo { + peers := make(peerinfo.PeersWithBlockNumInfo, count) + for i := range peers { + var blockNum *big.Int + if i < len(blockNums) { + blockNum = blockNums[i] + } else { + blockNum = new(big.Int).SetUint64(math.MaxUint64) + } + + peers[i] = &peerinfo.PeerWithBlockNumInfo{ + ID: fmt.Sprintf("peer%d", i+1), + BlockNum: blockNum, + } + } + + return peers +} + +func (hdt headerDownloaderTest) fakeCheckpoints(count int) []*checkpoint.Checkpoint { + checkpoints := make([]*checkpoint.Checkpoint, count) + for i := range checkpoints { + num := i + 1 + checkpoints[i] = &checkpoint.Checkpoint{ + StartBlock: big.NewInt(int64(num)), + EndBlock: big.NewInt(int64(num)), + RootHash: common.BytesToHash([]byte(fmt.Sprintf("0x%d", num))), + } + } + + return checkpoints +} + +func (hdt headerDownloaderTest) fakeMilestones(count int) []*milestone.Milestone { + milestones := make([]*milestone.Milestone, count) + for i := range milestones { + num := i + 1 + milestones[i] = &milestone.Milestone{ + StartBlock: big.NewInt(int64(num)), + EndBlock: big.NewInt(int64(num)), + Hash: common.BytesToHash([]byte(fmt.Sprintf("0x%d", num))), + } + } + + return milestones +} + +type downloadHeadersMock func(context.Context, *big.Int, *big.Int, string) ([]*types.Header, error) + +func (hdt headerDownloaderTest) defaultDownloadHeadersMock() downloadHeadersMock { + return func(ctx context.Context, start *big.Int, end *big.Int, peerID string) ([]*types.Header, error) { + res := make([]*types.Header, new(big.Int).Sub(end, start).Uint64()+1) + for i := new(big.Int).Set(start); i.Cmp(end) < 1; i.Add(i, new(big.Int).SetUint64(1)) { + res[new(big.Int).Sub(i, start).Uint64()] = &types.Header{Number: new(big.Int).Set(i)} + } + return res, nil + } +} + +func (hdt headerDownloaderTest) defaultWriteHeadersMock(capture *[]*types.Header) func([]*types.Header) error { + return func(headers []*types.Header) error { + *capture = append(*capture, headers...) + return nil + } +} + +func TestHeaderDownloadUsingMilestones(t *testing.T) { + test := newHeaderDownloaderTest(t) + test.heimdall.EXPECT(). + FetchMilestones(gomock.Any(), gomock.Any()). + Return(test.fakeMilestones(4), nil). + Times(1) + test.sentry.EXPECT(). + PeersWithBlockNumInfo(). + Return(test.fakePeers(8)). + Times(1) + test.sentry.EXPECT(). + DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(test.defaultDownloadHeadersMock()). + Times(4) + var persistedHeaders []*types.Header + test.db.EXPECT(). + WriteHeaders(gomock.Any()). + DoAndReturn(test.defaultWriteHeadersMock(&persistedHeaders)). + Times(1) + + err := test.headerDownloader.DownloadUsingMilestones(context.Background(), 1) + require.NoError(t, err) + require.Len(t, persistedHeaders, 4) + // check headers are written in order + require.Equal(t, uint64(1), persistedHeaders[0].Number.Uint64()) + require.Equal(t, uint64(2), persistedHeaders[1].Number.Uint64()) + require.Equal(t, uint64(3), persistedHeaders[2].Number.Uint64()) + require.Equal(t, uint64(4), persistedHeaders[3].Number.Uint64()) +} + +func TestHeaderDownloadUsingCheckpoints(t *testing.T) { + test := newHeaderDownloaderTest(t) + test.heimdall.EXPECT(). + FetchCheckpoints(gomock.Any(), gomock.Any()). + Return(test.fakeCheckpoints(8), nil). + Times(1) + test.sentry.EXPECT(). + PeersWithBlockNumInfo(). + Return(test.fakePeers(2)). + Times(4) + test.sentry.EXPECT(). + DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(test.defaultDownloadHeadersMock()). + Times(8) + var persistedHeaders []*types.Header + test.db.EXPECT(). + WriteHeaders(gomock.Any()). + DoAndReturn(test.defaultWriteHeadersMock(&persistedHeaders)). + Times(4) + + err := test.headerDownloader.DownloadUsingCheckpoints(context.Background(), 1) + require.NoError(t, err) + require.Len(t, persistedHeaders, 8) + // check headers are written in order + require.Equal(t, uint64(1), persistedHeaders[0].Number.Uint64()) + require.Equal(t, uint64(2), persistedHeaders[1].Number.Uint64()) + require.Equal(t, uint64(3), persistedHeaders[2].Number.Uint64()) + require.Equal(t, uint64(4), persistedHeaders[3].Number.Uint64()) + require.Equal(t, uint64(5), persistedHeaders[4].Number.Uint64()) + require.Equal(t, uint64(6), persistedHeaders[5].Number.Uint64()) + require.Equal(t, uint64(7), persistedHeaders[6].Number.Uint64()) + require.Equal(t, uint64(8), persistedHeaders[7].Number.Uint64()) +} + +func TestHeaderDownloadWhenInvalidStateThenPenalizePeerAndReDownload(t *testing.T) { + var firstTimeInvalidReturned bool + firstTimeInvalidReturnedPtr := &firstTimeInvalidReturned + test := newHeaderDownloaderTestWithOpts(t, headerDownloaderTestOpts{ + headerVerifier: func(statePoint *statePoint, headers []*types.Header) error { + if statePoint.startBlock.Cmp(new(big.Int).SetUint64(2)) == 0 && !*firstTimeInvalidReturnedPtr { + *firstTimeInvalidReturnedPtr = true + return errors.New("invalid checkpoint") + } + return nil + }, + }) + test.heimdall.EXPECT(). + FetchCheckpoints(gomock.Any(), gomock.Any()). + Return(test.fakeCheckpoints(6), nil). + Times(1) + test.sentry.EXPECT(). + PeersWithBlockNumInfo(). + Return(test.fakePeers(3)). + Times(3) + test.sentry.EXPECT(). + DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(test.defaultDownloadHeadersMock()). + // request 1,2,3 in parallel + // -> 2 fails + // requests 2,3,4 in parallel + // 3 is cached + // requests 5,6 in parallel + // in total 6 requests + 1 request for re-requesting checkpoint 2 + // total = 7 (note this also tests caching works) + Times(7) + test.sentry.EXPECT(). + Penalize(gomock.Eq("peer2")). + Times(1) + var persistedHeadersFirstTime, persistedHeadersRemaining []*types.Header + gomock.InOrder( + test.db.EXPECT(). + WriteHeaders(gomock.Any()). + DoAndReturn(test.defaultWriteHeadersMock(&persistedHeadersFirstTime)). + Times(1), + test.db.EXPECT(). + WriteHeaders(gomock.Any()). + DoAndReturn(test.defaultWriteHeadersMock(&persistedHeadersRemaining)). + Times(2), + ) + + err := test.headerDownloader.DownloadUsingCheckpoints(context.Background(), 1) + require.NoError(t, err) + require.Len(t, persistedHeadersFirstTime, 1) + require.Len(t, persistedHeadersRemaining, 5) +} + +func TestHeaderDownloadWhenZeroPeersTriesAgain(t *testing.T) { + test := newHeaderDownloaderTest(t) + test.heimdall.EXPECT(). + FetchCheckpoints(gomock.Any(), gomock.Any()). + Return(test.fakeCheckpoints(8), nil). + Times(1) + test.sentry.EXPECT(). + DownloadHeaders(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(test.defaultDownloadHeadersMock()). + Times(8) + var persistedHeaders []*types.Header + test.db.EXPECT(). + WriteHeaders(gomock.Any()). + DoAndReturn(test.defaultWriteHeadersMock(&persistedHeaders)). + Times(4) + gomock.InOrder( + // first, no peers at all + test.sentry.EXPECT(). + PeersWithBlockNumInfo(). + Return(nil). + Times(1), + // second, 2 peers but not synced enough for us to use + test.sentry.EXPECT(). + PeersWithBlockNumInfo(). + Return(test.fakePeers(2, new(big.Int).SetUint64(0), new(big.Int).SetUint64(0))). + Times(1), + // then, 2 fully synced peers that we can use + test.sentry.EXPECT(). + PeersWithBlockNumInfo(). + Return(test.fakePeers(2)). + Times(4), + ) + + err := test.headerDownloader.DownloadUsingCheckpoints(context.Background(), 1) + require.NoError(t, err) + require.Len(t, persistedHeaders, 8) +} diff --git a/polygon/sync/header_verifier.go b/polygon/sync/header_verifier.go new file mode 100644 index 00000000000..6898f384926 --- /dev/null +++ b/polygon/sync/header_verifier.go @@ -0,0 +1,5 @@ +package sync + +import "github.com/ledgerwatch/erigon/core/types" + +type HeaderVerifier func(statePoint *statePoint, headers []*types.Header) error diff --git a/polygon/sync/heimdall.go b/polygon/sync/heimdall.go index 59a9d22b57b..9dc9161dd06 100644 --- a/polygon/sync/heimdall.go +++ b/polygon/sync/heimdall.go @@ -16,6 +16,8 @@ import ( ) // Heimdall is a wrapper of Heimdall HTTP API +// +//go:generate mockgen -destination=./mock/heimdall_mock.go -package=mock . Heimdall type Heimdall interface { FetchCheckpoints(ctx context.Context, start uint64) ([]*checkpoint.Checkpoint, error) FetchMilestones(ctx context.Context, start uint64) ([]*milestone.Milestone, error) diff --git a/polygon/sync/heimdall_test.go b/polygon/sync/heimdall_test.go index 0286e689532..2036feb84d5 100644 --- a/polygon/sync/heimdall_test.go +++ b/polygon/sync/heimdall_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/golang/mock/gomock" - "github.com/ledgerwatch/log/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/polygon/sync/mock/db_mock.go b/polygon/sync/mock/db_mock.go new file mode 100644 index 00000000000..22e6fa6b482 --- /dev/null +++ b/polygon/sync/mock/db_mock.go @@ -0,0 +1,49 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ledgerwatch/erigon/polygon/sync (interfaces: DB) + +// Package mock is a generated GoMock package. +package mock + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + types "github.com/ledgerwatch/erigon/core/types" +) + +// MockDB is a mock of DB interface. +type MockDB struct { + ctrl *gomock.Controller + recorder *MockDBMockRecorder +} + +// MockDBMockRecorder is the mock recorder for MockDB. +type MockDBMockRecorder struct { + mock *MockDB +} + +// NewMockDB creates a new mock instance. +func NewMockDB(ctrl *gomock.Controller) *MockDB { + mock := &MockDB{ctrl: ctrl} + mock.recorder = &MockDBMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDB) EXPECT() *MockDBMockRecorder { + return m.recorder +} + +// WriteHeaders mocks base method. +func (m *MockDB) WriteHeaders(arg0 []*types.Header) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteHeaders", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteHeaders indicates an expected call of WriteHeaders. +func (mr *MockDBMockRecorder) WriteHeaders(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteHeaders", reflect.TypeOf((*MockDB)(nil).WriteHeaders), arg0) +} diff --git a/polygon/sync/mock/heimdall_mock.go b/polygon/sync/mock/heimdall_mock.go new file mode 100644 index 00000000000..c38947dc559 --- /dev/null +++ b/polygon/sync/mock/heimdall_mock.go @@ -0,0 +1,97 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ledgerwatch/erigon/polygon/sync (interfaces: Heimdall) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + checkpoint "github.com/ledgerwatch/erigon/consensus/bor/heimdall/checkpoint" + milestone "github.com/ledgerwatch/erigon/consensus/bor/heimdall/milestone" + span "github.com/ledgerwatch/erigon/consensus/bor/heimdall/span" +) + +// MockHeimdall is a mock of Heimdall interface. +type MockHeimdall struct { + ctrl *gomock.Controller + recorder *MockHeimdallMockRecorder +} + +// MockHeimdallMockRecorder is the mock recorder for MockHeimdall. +type MockHeimdallMockRecorder struct { + mock *MockHeimdall +} + +// NewMockHeimdall creates a new mock instance. +func NewMockHeimdall(ctrl *gomock.Controller) *MockHeimdall { + mock := &MockHeimdall{ctrl: ctrl} + mock.recorder = &MockHeimdallMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockHeimdall) EXPECT() *MockHeimdallMockRecorder { + return m.recorder +} + +// FetchCheckpoints mocks base method. +func (m *MockHeimdall) FetchCheckpoints(arg0 context.Context, arg1 uint64) ([]*checkpoint.Checkpoint, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchCheckpoints", arg0, arg1) + ret0, _ := ret[0].([]*checkpoint.Checkpoint) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchCheckpoints indicates an expected call of FetchCheckpoints. +func (mr *MockHeimdallMockRecorder) FetchCheckpoints(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchCheckpoints", reflect.TypeOf((*MockHeimdall)(nil).FetchCheckpoints), arg0, arg1) +} + +// FetchMilestones mocks base method. +func (m *MockHeimdall) FetchMilestones(arg0 context.Context, arg1 uint64) ([]*milestone.Milestone, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchMilestones", arg0, arg1) + ret0, _ := ret[0].([]*milestone.Milestone) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchMilestones indicates an expected call of FetchMilestones. +func (mr *MockHeimdallMockRecorder) FetchMilestones(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchMilestones", reflect.TypeOf((*MockHeimdall)(nil).FetchMilestones), arg0, arg1) +} + +// FetchSpan mocks base method. +func (m *MockHeimdall) FetchSpan(arg0 context.Context, arg1 uint64) (*span.HeimdallSpan, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchSpan", arg0, arg1) + ret0, _ := ret[0].(*span.HeimdallSpan) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchSpan indicates an expected call of FetchSpan. +func (mr *MockHeimdallMockRecorder) FetchSpan(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchSpan", reflect.TypeOf((*MockHeimdall)(nil).FetchSpan), arg0, arg1) +} + +// OnMilestoneEvent mocks base method. +func (m *MockHeimdall) OnMilestoneEvent(arg0 context.Context, arg1 func(*milestone.Milestone)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "OnMilestoneEvent", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// OnMilestoneEvent indicates an expected call of OnMilestoneEvent. +func (mr *MockHeimdallMockRecorder) OnMilestoneEvent(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMilestoneEvent", reflect.TypeOf((*MockHeimdall)(nil).OnMilestoneEvent), arg0, arg1) +} diff --git a/polygon/sync/mock/sentry_mock.go b/polygon/sync/mock/sentry_mock.go new file mode 100644 index 00000000000..09da633586b --- /dev/null +++ b/polygon/sync/mock/sentry_mock.go @@ -0,0 +1,93 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/ledgerwatch/erigon/polygon/sync (interfaces: Sentry) + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + big "math/big" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + types "github.com/ledgerwatch/erigon/core/types" + peerinfo "github.com/ledgerwatch/erigon/polygon/sync/peerinfo" +) + +// MockSentry is a mock of Sentry interface. +type MockSentry struct { + ctrl *gomock.Controller + recorder *MockSentryMockRecorder +} + +// MockSentryMockRecorder is the mock recorder for MockSentry. +type MockSentryMockRecorder struct { + mock *MockSentry +} + +// NewMockSentry creates a new mock instance. +func NewMockSentry(ctrl *gomock.Controller) *MockSentry { + mock := &MockSentry{ctrl: ctrl} + mock.recorder = &MockSentryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSentry) EXPECT() *MockSentryMockRecorder { + return m.recorder +} + +// DownloadHeaders mocks base method. +func (m *MockSentry) DownloadHeaders(arg0 context.Context, arg1, arg2 *big.Int, arg3 string) ([]*types.Header, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DownloadHeaders", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].([]*types.Header) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DownloadHeaders indicates an expected call of DownloadHeaders. +func (mr *MockSentryMockRecorder) DownloadHeaders(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadHeaders", reflect.TypeOf((*MockSentry)(nil).DownloadHeaders), arg0, arg1, arg2, arg3) +} + +// MaxPeers mocks base method. +func (m *MockSentry) MaxPeers() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MaxPeers") + ret0, _ := ret[0].(int) + return ret0 +} + +// MaxPeers indicates an expected call of MaxPeers. +func (mr *MockSentryMockRecorder) MaxPeers() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxPeers", reflect.TypeOf((*MockSentry)(nil).MaxPeers)) +} + +// PeersWithBlockNumInfo mocks base method. +func (m *MockSentry) PeersWithBlockNumInfo() peerinfo.PeersWithBlockNumInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PeersWithBlockNumInfo") + ret0, _ := ret[0].(peerinfo.PeersWithBlockNumInfo) + return ret0 +} + +// PeersWithBlockNumInfo indicates an expected call of PeersWithBlockNumInfo. +func (mr *MockSentryMockRecorder) PeersWithBlockNumInfo() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeersWithBlockNumInfo", reflect.TypeOf((*MockSentry)(nil).PeersWithBlockNumInfo)) +} + +// Penalize mocks base method. +func (m *MockSentry) Penalize(arg0 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Penalize", arg0) +} + +// Penalize indicates an expected call of Penalize. +func (mr *MockSentryMockRecorder) Penalize(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Penalize", reflect.TypeOf((*MockSentry)(nil).Penalize), arg0) +} diff --git a/polygon/sync/peerinfo/peer_with_block_num_info.go b/polygon/sync/peerinfo/peer_with_block_num_info.go new file mode 100644 index 00000000000..643aa078deb --- /dev/null +++ b/polygon/sync/peerinfo/peer_with_block_num_info.go @@ -0,0 +1,22 @@ +package peerinfo + +import "math/big" + +type PeerWithBlockNumInfo struct { + ID string + BlockNum *big.Int +} + +type PeersWithBlockNumInfo []*PeerWithBlockNumInfo + +func (peers PeersWithBlockNumInfo) Len() int { + return len(peers) +} + +func (peers PeersWithBlockNumInfo) Less(i int, j int) bool { + return peers[i].BlockNum.Cmp(peers[j].BlockNum) < 1 +} + +func (peers PeersWithBlockNumInfo) Swap(i int, j int) { + peers[i], peers[j] = peers[j], peers[i] +} diff --git a/polygon/sync/sentry.go b/polygon/sync/sentry.go new file mode 100644 index 00000000000..aa8e5198cc6 --- /dev/null +++ b/polygon/sync/sentry.go @@ -0,0 +1,17 @@ +package sync + +import ( + "context" + "math/big" + + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/polygon/sync/peerinfo" +) + +//go:generate mockgen -destination=./mock/sentry_mock.go -package=mock . Sentry +type Sentry interface { + MaxPeers() int + PeersWithBlockNumInfo() peerinfo.PeersWithBlockNumInfo + DownloadHeaders(ctx context.Context, start *big.Int, end *big.Int, peerID string) ([]*types.Header, error) + Penalize(peerID string) +} diff --git a/polygon/sync/state_point.go b/polygon/sync/state_point.go new file mode 100644 index 00000000000..dfd61da3858 --- /dev/null +++ b/polygon/sync/state_point.go @@ -0,0 +1,47 @@ +package sync + +import ( + "math/big" + + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon/consensus/bor/heimdall/checkpoint" + "github.com/ledgerwatch/erigon/consensus/bor/heimdall/milestone" +) + +func statePointFromCheckpoint(checkpoint *checkpoint.Checkpoint) *statePoint { + return &statePoint{ + proposer: checkpoint.Proposer, + startBlock: new(big.Int).Set(checkpoint.StartBlock), + endBlock: new(big.Int).Set(checkpoint.EndBlock), + rootHash: checkpoint.RootHash, + chainId: checkpoint.BorChainID, + timestamp: checkpoint.Timestamp, + kind: checkpointKind, + } +} + +func statePointFromMilestone(milestone *milestone.Milestone) *statePoint { + return &statePoint{ + proposer: milestone.Proposer, + startBlock: new(big.Int).Set(milestone.StartBlock), + endBlock: new(big.Int).Set(milestone.EndBlock), + rootHash: milestone.Hash, + chainId: milestone.BorChainID, + timestamp: milestone.Timestamp, + kind: milestoneKind, + } +} + +type statePoint struct { + proposer common.Address + startBlock *big.Int + endBlock *big.Int + rootHash common.Hash + chainId string + timestamp uint64 + kind statePointKind +} + +func (sp *statePoint) length() int { + return int(new(big.Int).Sub(sp.endBlock, sp.startBlock).Int64() + 1) +} diff --git a/polygon/sync/state_point_kind.go b/polygon/sync/state_point_kind.go new file mode 100644 index 00000000000..c61cb5e84bc --- /dev/null +++ b/polygon/sync/state_point_kind.go @@ -0,0 +1,8 @@ +package sync + +type statePointKind string + +const ( + checkpointKind = statePointKind("checkpoint") + milestoneKind = statePointKind("milestone") +) diff --git a/polygon/sync/state_points.go b/polygon/sync/state_points.go new file mode 100644 index 00000000000..5577f24d2f4 --- /dev/null +++ b/polygon/sync/state_points.go @@ -0,0 +1,26 @@ +package sync + +import ( + "github.com/ledgerwatch/erigon/consensus/bor/heimdall/checkpoint" + "github.com/ledgerwatch/erigon/consensus/bor/heimdall/milestone" +) + +func statePointsFromCheckpoints(checkpoints []*checkpoint.Checkpoint) statePoints { + statePoints := make(statePoints, len(checkpoints)) + for i, checkpoint := range checkpoints { + statePoints[i] = statePointFromCheckpoint(checkpoint) + } + + return statePoints +} + +func statePointsFromMilestones(milestones []*milestone.Milestone) statePoints { + statePoints := make(statePoints, len(milestones)) + for i, milestone := range milestones { + statePoints[i] = statePointFromMilestone(milestone) + } + + return statePoints +} + +type statePoints []*statePoint