Skip to content

Commit 6c6c56d

Browse files
authored
Add historical sync test (#44)
Asserts that the beacon node follower can sync blobs during initial-sync. This patch also refactors the test harness to accommodate dynamic nodes.
1 parent 513ca8b commit 6c6c56d

File tree

11 files changed

+687
-532
lines changed

11 files changed

+687
-532
lines changed

.github/workflows/ci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ jobs:
3030
- name: Run Fee market spec tests
3131
run: go run ./tests/fee-market
3232

33+
- name: Run Initial sync tests
34+
run: go run ./tests/initial-sync
35+
3336
- name: Collect docker logs on failure
3437
if: failure()
3538
uses: jwalton/gh-docker-logs@v1

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/protolambda/ztyp v0.2.1
1212
github.com/prysmaticlabs/prysm/v3 v3.1.1
1313
github.com/wealdtech/go-bytesutil v1.1.1
14+
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
1415
google.golang.org/grpc v1.40.0
1516
gopkg.in/yaml.v2 v2.4.0
1617
)
@@ -179,7 +180,6 @@ require (
179180
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
180181
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
181182
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
182-
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
183183
golang.org/x/sys v0.0.0-20220818161305-2296e01440c6 // indirect
184184
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
185185
golang.org/x/text v0.3.7 // indirect

shared/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ var (
4545
BeaconRPC = "localhost:4000"
4646
BeaconGatewayGRPC = "localhost:3500"
4747
BeaconMultiAddress = "/ip4/0.0.0.0/tcp/13000"
48-
BeaconFollowerRPC = "http://localhost:3501"
48+
BeaconFollowerRPC = "localhost:4001"
49+
BeaconGatewayFollowerGRPC = "localhost:3501"
4950
BeaconFollowerMultiAddress = "/ip4/0.0.0.0/tcp/13001"
5051
ValidatorRPC = "http://localhost:7500"
5152
)

tests/blobtx/main.go

Lines changed: 25 additions & 230 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,20 @@
11
package main
22

33
import (
4-
"bytes"
54
"context"
6-
"errors"
7-
"fmt"
8-
"io"
95
"log"
106
"math/big"
11-
"strings"
127
"time"
138

149
"github.com/Inphi/eip4844-interop/shared"
1510
"github.com/Inphi/eip4844-interop/tests/ctrl"
11+
"github.com/Inphi/eip4844-interop/tests/util"
1612
"github.com/ethereum/go-ethereum/common"
1713
"github.com/ethereum/go-ethereum/core/types"
1814
"github.com/ethereum/go-ethereum/crypto"
19-
"github.com/ethereum/go-ethereum/params"
15+
"github.com/ethereum/go-ethereum/ethclient"
2016
"github.com/holiman/uint256"
21-
"github.com/libp2p/go-libp2p"
22-
libp2pcore "github.com/libp2p/go-libp2p-core"
23-
"github.com/libp2p/go-libp2p-core/host"
24-
"github.com/libp2p/go-libp2p-core/peer"
25-
"github.com/libp2p/go-libp2p-core/protocol"
26-
ma "github.com/multiformats/go-multiaddr"
2717
"github.com/protolambda/ztyp/view"
28-
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
29-
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder"
30-
"github.com/prysmaticlabs/prysm/v3/beacon-chain/sync"
31-
consensustypes "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
32-
ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1"
33-
ethpbv2 "github.com/prysmaticlabs/prysm/v3/proto/eth/v2"
34-
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
3518
)
3619

3720
func GetBlobs() types.Blobs {
@@ -47,61 +30,42 @@ func main() {
4730
ctrl.InitE2ETest()
4831
ctrl.WaitForShardingFork()
4932
ctrl.WaitForEip4844ForkEpoch()
33+
env := ctrl.GetEnv()
5034

5135
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20)
5236
defer cancel()
5337

38+
ethClient, err := ctrl.GetExecutionClient(ctx)
39+
if err != nil {
40+
log.Fatalf("unable to get execution client: %v", err)
41+
}
42+
beaconClient, err := ctrl.GetBeaconNodeClient(ctx)
43+
if err != nil {
44+
log.Fatalf("unable to get beacon client: %v", err)
45+
}
46+
5447
blobs := GetBlobs()
5548

5649
// Retrieve the current slot to being our blobs search on the beacon chain
57-
startSlot := GetHeadSlot(ctx)
50+
startSlot := util.GetHeadSlot(ctx, beaconClient)
5851

59-
UploadBlobs(ctx, blobs)
60-
WaitForNextSlot(ctx)
61-
slot := FindBlobSlot(ctx, startSlot)
52+
UploadBlobs(ctx, ethClient, blobs)
53+
util.WaitForNextSlots(ctx, beaconClient, 1)
54+
slot := util.FindBlobSlot(ctx, beaconClient, startSlot)
6255

6356
log.Printf("checking blob from beacon node")
64-
downloadedData := DownloadBlobs(ctx, slot, 1, shared.BeaconMultiAddress)
57+
downloadedData := util.DownloadBlobs(ctx, slot, 1, shared.BeaconMultiAddress)
6558
downloadedBlobs := shared.EncodeBlobs(downloadedData)
66-
AssertBlobsEquals(blobs, downloadedBlobs)
59+
util.AssertBlobsEquals(blobs, downloadedBlobs)
6760

6861
log.Printf("checking blob from beacon node follower")
69-
time.Sleep(time.Second * 2 * time.Duration(ctrl.Env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync
70-
downloadedData = DownloadBlobs(ctx, slot, 1, shared.BeaconFollowerMultiAddress)
62+
time.Sleep(time.Second * 2 * time.Duration(env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync
63+
downloadedData = util.DownloadBlobs(ctx, slot, 1, shared.BeaconFollowerMultiAddress)
7164
downloadedBlobs = shared.EncodeBlobs(downloadedData)
72-
AssertBlobsEquals(blobs, downloadedBlobs)
65+
util.AssertBlobsEquals(blobs, downloadedBlobs)
7366
}
7467

75-
func AssertBlobsEquals(a, b types.Blobs) {
76-
// redundant for nice for debugging
77-
if len(a) != len(b) {
78-
log.Fatalf("data length mismatch (%d != %d)", len(a), len(b))
79-
}
80-
for i, _ := range a {
81-
for j := 0; j < params.FieldElementsPerBlob; j++ {
82-
if !bytes.Equal(a[i][j][:], b[i][j][:]) {
83-
log.Fatal("blobs data mismatch")
84-
}
85-
}
86-
}
87-
}
88-
89-
func WaitForNextSlot(ctx context.Context) {
90-
if err := ctrl.WaitForSlot(ctx, GetHeadSlot(ctx).Add(1)); err != nil {
91-
log.Fatalf("error waiting for next slot: %v", err)
92-
}
93-
}
94-
95-
func GetHeadSlot(ctx context.Context) consensustypes.Slot {
96-
req := &ethpbv1.BlockRequest{BlockId: []byte("head")}
97-
header, err := ctrl.Env.BeaconChainClient.GetBlockHeader(ctx, req)
98-
if err != nil {
99-
log.Fatalf("unable to get beacon chain head: %v", err)
100-
}
101-
return header.Data.Header.Message.Slot
102-
}
103-
104-
func UploadBlobs(ctx context.Context, blobs types.Blobs) {
68+
func UploadBlobs(ctx context.Context, client *ethclient.Client, blobs types.Blobs) {
10569
chainId := big.NewInt(1)
10670
signer := types.NewDankSigner(chainId)
10771

@@ -110,7 +74,7 @@ func UploadBlobs(ctx context.Context, blobs types.Blobs) {
11074
log.Fatalf("Failed to load private key: %v", err)
11175
}
11276

113-
nonce, err := ctrl.Env.EthClient.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey))
77+
nonce, err := client.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey))
11478
if err != nil {
11579
log.Fatalf("Error getting nonce: %v", err)
11680
}
@@ -143,183 +107,14 @@ func UploadBlobs(ctx context.Context, blobs types.Blobs) {
143107
if err != nil {
144108
log.Fatalf("Error signing tx: %v", err)
145109
}
146-
err = ctrl.Env.EthClient.SendTransaction(ctx, tx)
110+
err = client.SendTransaction(ctx, tx)
147111
if err != nil {
148112
log.Fatalf("Error sending tx: %v", err)
149113
}
150114
log.Printf("Transaction submitted. hash=%v", tx.Hash())
151115

152116
log.Printf("Waiting for transaction (%v) to be included...", tx.Hash())
153-
if _, err := shared.WaitForReceipt(ctx, ctrl.Env.EthClient, tx.Hash()); err != nil {
117+
if _, err := shared.WaitForReceipt(ctx, client, tx.Hash()); err != nil {
154118
log.Fatalf("Error waiting for transaction receipt %v: %v", tx.Hash(), err)
155119
}
156120
}
157-
158-
func FindBlobSlot(ctx context.Context, startSlot consensustypes.Slot) consensustypes.Slot {
159-
slot := startSlot
160-
endSlot := GetHeadSlot(ctx)
161-
for {
162-
if slot == endSlot {
163-
log.Fatalf("Unable to find beacon block containing blobs")
164-
}
165-
166-
blockID := fmt.Sprintf("%d", uint64(slot))
167-
req := &ethpbv2.BlockRequestV2{BlockId: []byte(blockID)}
168-
block, err := ctrl.Env.BeaconChainClient.GetBlockV2(ctx, req)
169-
if err != nil {
170-
log.Fatalf("beaconchainclient.GetBlock: %v", err)
171-
}
172-
eip4844, ok := block.Data.Message.(*ethpbv2.SignedBeaconBlockContainer_Eip4844Block)
173-
if ok {
174-
if len(eip4844.Eip4844Block.Body.BlobKzgs) != 0 {
175-
return eip4844.Eip4844Block.Slot
176-
}
177-
}
178-
179-
slot = slot.Add(1)
180-
}
181-
}
182-
183-
func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, count uint64, beaconMA string) []byte {
184-
// TODO: Use Beacon gRPC to download blobs rather than p2p RPC
185-
log.Print("downloading blobs...")
186-
187-
req := &ethpb.BlobsSidecarsByRangeRequest{
188-
StartSlot: startSlot,
189-
Count: count,
190-
}
191-
192-
h, err := libp2p.New()
193-
if err != nil {
194-
log.Fatalf("failed to create libp2p context: %v", err)
195-
}
196-
defer func() {
197-
_ = h.Close()
198-
}()
199-
200-
multiaddr, err := getMultiaddr(ctx, h, beaconMA)
201-
if err != nil {
202-
log.Fatalf("getMultiAddr: %v", err)
203-
}
204-
205-
addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr)
206-
if err != nil {
207-
log.Fatal(err)
208-
}
209-
210-
err = h.Connect(ctx, *addrInfo)
211-
if err != nil {
212-
log.Fatalf("libp2p host connect: %v", err)
213-
}
214-
215-
sidecars, err := sendBlobsSidecarsByRangeRequest(ctx, h, encoder.SszNetworkEncoder{}, addrInfo.ID, req)
216-
if err != nil {
217-
log.Fatalf("failed to send blobs p2p request: %v", err)
218-
}
219-
220-
anyBlobs := false
221-
blobsBuffer := new(bytes.Buffer)
222-
for _, sidecar := range sidecars {
223-
if sidecar.Blobs == nil || len(sidecar.Blobs) == 0 {
224-
continue
225-
}
226-
anyBlobs = true
227-
for _, blob := range sidecar.Blobs {
228-
data := shared.DecodeBlob(blob.Blob)
229-
_, _ = blobsBuffer.Write(data)
230-
}
231-
232-
// stop after the first sidecar with blobs:
233-
break
234-
}
235-
236-
if !anyBlobs {
237-
log.Fatalf("No blobs found in requested slots, sidecar count: %d", len(sidecars))
238-
}
239-
240-
return blobsBuffer.Bytes()
241-
}
242-
243-
func getMultiaddr(ctx context.Context, h host.Host, addr string) (ma.Multiaddr, error) {
244-
multiaddr, err := ma.NewMultiaddr(addr)
245-
if err != nil {
246-
return nil, err
247-
}
248-
_, id := peer.SplitAddr(multiaddr)
249-
if id != "" {
250-
return multiaddr, nil
251-
}
252-
// peer ID wasn't provided, look it up
253-
id, err = retrievePeerID(ctx, h, addr)
254-
if err != nil {
255-
return nil, err
256-
}
257-
return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, string(id)))
258-
}
259-
260-
// Helper for retrieving the peer ID from a security error... obviously don't use this in production!
261-
// See https://github.com/libp2p/go-libp2p-noise/blob/v0.3.0/handshake.go#L250
262-
func retrievePeerID(ctx context.Context, h host.Host, addr string) (peer.ID, error) {
263-
incorrectPeerID := "16Uiu2HAmSifdT5QutTsaET8xqjWAMPp4obrQv7LN79f2RMmBe3nY"
264-
addrInfo, err := peer.AddrInfoFromString(fmt.Sprintf("%s/p2p/%s", addr, incorrectPeerID))
265-
if err != nil {
266-
return "", err
267-
}
268-
err = h.Connect(ctx, *addrInfo)
269-
if err == nil {
270-
return "", errors.New("unexpected successful connection")
271-
}
272-
if strings.Contains(err.Error(), "but remote key matches") {
273-
split := strings.Split(err.Error(), " ")
274-
return peer.ID(split[len(split)-1]), nil
275-
}
276-
return "", err
277-
}
278-
279-
func sendBlobsSidecarsByRangeRequest(ctx context.Context, h host.Host, encoding encoder.NetworkEncoding, pid peer.ID, req *ethpb.BlobsSidecarsByRangeRequest) ([]*ethpb.BlobsSidecar, error) {
280-
topic := fmt.Sprintf("%s%s", p2p.RPCBlobsSidecarsByRangeTopicV1, encoding.ProtocolSuffix())
281-
282-
stream, err := h.NewStream(ctx, pid, protocol.ID(topic))
283-
if err != nil {
284-
return nil, err
285-
}
286-
defer func() {
287-
_ = stream.Close()
288-
}()
289-
290-
if _, err := encoding.EncodeWithMaxLength(stream, req); err != nil {
291-
_ = stream.Reset()
292-
return nil, err
293-
}
294-
295-
if err := stream.CloseWrite(); err != nil {
296-
_ = stream.Reset()
297-
return nil, err
298-
}
299-
300-
var blobsSidecars []*ethpb.BlobsSidecar
301-
for {
302-
blobs, err := readChunkedBlobsSidecar(stream, encoding)
303-
if errors.Is(err, io.EOF) {
304-
break
305-
}
306-
if err != nil {
307-
return nil, err
308-
}
309-
blobsSidecars = append(blobsSidecars, blobs)
310-
}
311-
return blobsSidecars, nil
312-
}
313-
314-
func readChunkedBlobsSidecar(stream libp2pcore.Stream, encoding encoder.NetworkEncoding) (*ethpb.BlobsSidecar, error) {
315-
code, errMsg, err := sync.ReadStatusCode(stream, encoding)
316-
if err != nil {
317-
return nil, err
318-
}
319-
if code != 0 {
320-
return nil, errors.New(errMsg)
321-
}
322-
sidecar := new(ethpb.BlobsSidecar)
323-
err = encoding.DecodeWithMaxLength(stream, sidecar)
324-
return sidecar, err
325-
}

0 commit comments

Comments
 (0)