Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ jobs:
- name: Run Fee market spec tests
run: go run ./tests/fee-market

- name: Run Initial sync tests
run: go run ./tests/initial-sync

- name: Collect docker logs on failure
if: failure()
uses: jwalton/gh-docker-logs@v1
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/protolambda/ztyp v0.2.1
github.com/prysmaticlabs/prysm/v3 v3.1.1
github.com/wealdtech/go-bytesutil v1.1.1
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f
google.golang.org/grpc v1.40.0
gopkg.in/yaml.v2 v2.4.0
)
Expand Down Expand Up @@ -179,7 +180,6 @@ require (
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/sys v0.0.0-20220818161305-2296e01440c6 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
3 changes: 2 additions & 1 deletion shared/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ var (
BeaconRPC = "localhost:4000"
BeaconGatewayGRPC = "localhost:3500"
BeaconMultiAddress = "/ip4/0.0.0.0/tcp/13000"
BeaconFollowerRPC = "http://localhost:3501"
BeaconFollowerRPC = "localhost:4001"
BeaconGatewayFollowerGRPC = "localhost:3501"
BeaconFollowerMultiAddress = "/ip4/0.0.0.0/tcp/13001"
ValidatorRPC = "http://localhost:7500"
)
255 changes: 25 additions & 230 deletions tests/blobtx/main.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,20 @@
package main

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"math/big"
"strings"
"time"

"github.com/Inphi/eip4844-interop/shared"
"github.com/Inphi/eip4844-interop/tests/ctrl"
"github.com/Inphi/eip4844-interop/tests/util"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/holiman/uint256"
"github.com/libp2p/go-libp2p"
libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr"
"github.com/protolambda/ztyp/view"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/sync"
consensustypes "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1"
ethpbv2 "github.com/prysmaticlabs/prysm/v3/proto/eth/v2"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
)

func GetBlobs() types.Blobs {
Expand All @@ -47,61 +30,42 @@ func main() {
ctrl.InitE2ETest()
ctrl.WaitForShardingFork()
ctrl.WaitForEip4844ForkEpoch()
env := ctrl.GetEnv()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20)
defer cancel()

ethClient, err := ctrl.GetExecutionClient(ctx)
if err != nil {
log.Fatalf("unable to get execution client: %v", err)
}
beaconClient, err := ctrl.GetBeaconNodeClient(ctx)
if err != nil {
log.Fatalf("unable to get beacon client: %v", err)
}

blobs := GetBlobs()

// Retrieve the current slot to being our blobs search on the beacon chain
startSlot := GetHeadSlot(ctx)
startSlot := util.GetHeadSlot(ctx, beaconClient)

UploadBlobs(ctx, blobs)
WaitForNextSlot(ctx)
slot := FindBlobSlot(ctx, startSlot)
UploadBlobs(ctx, ethClient, blobs)
util.WaitForNextSlots(ctx, beaconClient, 1)
slot := util.FindBlobSlot(ctx, beaconClient, startSlot)

log.Printf("checking blob from beacon node")
downloadedData := DownloadBlobs(ctx, slot, 1, shared.BeaconMultiAddress)
downloadedData := util.DownloadBlobs(ctx, slot, 1, shared.BeaconMultiAddress)
downloadedBlobs := shared.EncodeBlobs(downloadedData)
AssertBlobsEquals(blobs, downloadedBlobs)
util.AssertBlobsEquals(blobs, downloadedBlobs)

log.Printf("checking blob from beacon node follower")
time.Sleep(time.Second * 2 * time.Duration(ctrl.Env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync
downloadedData = DownloadBlobs(ctx, slot, 1, shared.BeaconFollowerMultiAddress)
time.Sleep(time.Second * 2 * time.Duration(env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync
downloadedData = util.DownloadBlobs(ctx, slot, 1, shared.BeaconFollowerMultiAddress)
downloadedBlobs = shared.EncodeBlobs(downloadedData)
AssertBlobsEquals(blobs, downloadedBlobs)
util.AssertBlobsEquals(blobs, downloadedBlobs)
}

func AssertBlobsEquals(a, b types.Blobs) {
// redundant for nice for debugging
if len(a) != len(b) {
log.Fatalf("data length mismatch (%d != %d)", len(a), len(b))
}
for i, _ := range a {
for j := 0; j < params.FieldElementsPerBlob; j++ {
if !bytes.Equal(a[i][j][:], b[i][j][:]) {
log.Fatal("blobs data mismatch")
}
}
}
}

func WaitForNextSlot(ctx context.Context) {
if err := ctrl.WaitForSlot(ctx, GetHeadSlot(ctx).Add(1)); err != nil {
log.Fatalf("error waiting for next slot: %v", err)
}
}

func GetHeadSlot(ctx context.Context) consensustypes.Slot {
req := &ethpbv1.BlockRequest{BlockId: []byte("head")}
header, err := ctrl.Env.BeaconChainClient.GetBlockHeader(ctx, req)
if err != nil {
log.Fatalf("unable to get beacon chain head: %v", err)
}
return header.Data.Header.Message.Slot
}

func UploadBlobs(ctx context.Context, blobs types.Blobs) {
func UploadBlobs(ctx context.Context, client *ethclient.Client, blobs types.Blobs) {
chainId := big.NewInt(1)
signer := types.NewDankSigner(chainId)

Expand All @@ -110,7 +74,7 @@ func UploadBlobs(ctx context.Context, blobs types.Blobs) {
log.Fatalf("Failed to load private key: %v", err)
}

nonce, err := ctrl.Env.EthClient.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey))
nonce, err := client.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey))
if err != nil {
log.Fatalf("Error getting nonce: %v", err)
}
Expand Down Expand Up @@ -143,183 +107,14 @@ func UploadBlobs(ctx context.Context, blobs types.Blobs) {
if err != nil {
log.Fatalf("Error signing tx: %v", err)
}
err = ctrl.Env.EthClient.SendTransaction(ctx, tx)
err = client.SendTransaction(ctx, tx)
if err != nil {
log.Fatalf("Error sending tx: %v", err)
}
log.Printf("Transaction submitted. hash=%v", tx.Hash())

log.Printf("Waiting for transaction (%v) to be included...", tx.Hash())
if _, err := shared.WaitForReceipt(ctx, ctrl.Env.EthClient, tx.Hash()); err != nil {
if _, err := shared.WaitForReceipt(ctx, client, tx.Hash()); err != nil {
log.Fatalf("Error waiting for transaction receipt %v: %v", tx.Hash(), err)
}
}

func FindBlobSlot(ctx context.Context, startSlot consensustypes.Slot) consensustypes.Slot {
slot := startSlot
endSlot := GetHeadSlot(ctx)
for {
if slot == endSlot {
log.Fatalf("Unable to find beacon block containing blobs")
}

blockID := fmt.Sprintf("%d", uint64(slot))
req := &ethpbv2.BlockRequestV2{BlockId: []byte(blockID)}
block, err := ctrl.Env.BeaconChainClient.GetBlockV2(ctx, req)
if err != nil {
log.Fatalf("beaconchainclient.GetBlock: %v", err)
}
eip4844, ok := block.Data.Message.(*ethpbv2.SignedBeaconBlockContainer_Eip4844Block)
if ok {
if len(eip4844.Eip4844Block.Body.BlobKzgs) != 0 {
return eip4844.Eip4844Block.Slot
}
}

slot = slot.Add(1)
}
}

func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, count uint64, beaconMA string) []byte {
// TODO: Use Beacon gRPC to download blobs rather than p2p RPC
log.Print("downloading blobs...")

req := &ethpb.BlobsSidecarsByRangeRequest{
StartSlot: startSlot,
Count: count,
}

h, err := libp2p.New()
if err != nil {
log.Fatalf("failed to create libp2p context: %v", err)
}
defer func() {
_ = h.Close()
}()

multiaddr, err := getMultiaddr(ctx, h, beaconMA)
if err != nil {
log.Fatalf("getMultiAddr: %v", err)
}

addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr)
if err != nil {
log.Fatal(err)
}

err = h.Connect(ctx, *addrInfo)
if err != nil {
log.Fatalf("libp2p host connect: %v", err)
}

sidecars, err := sendBlobsSidecarsByRangeRequest(ctx, h, encoder.SszNetworkEncoder{}, addrInfo.ID, req)
if err != nil {
log.Fatalf("failed to send blobs p2p request: %v", err)
}

anyBlobs := false
blobsBuffer := new(bytes.Buffer)
for _, sidecar := range sidecars {
if sidecar.Blobs == nil || len(sidecar.Blobs) == 0 {
continue
}
anyBlobs = true
for _, blob := range sidecar.Blobs {
data := shared.DecodeBlob(blob.Blob)
_, _ = blobsBuffer.Write(data)
}

// stop after the first sidecar with blobs:
break
}

if !anyBlobs {
log.Fatalf("No blobs found in requested slots, sidecar count: %d", len(sidecars))
}

return blobsBuffer.Bytes()
}

func getMultiaddr(ctx context.Context, h host.Host, addr string) (ma.Multiaddr, error) {
multiaddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
_, id := peer.SplitAddr(multiaddr)
if id != "" {
return multiaddr, nil
}
// peer ID wasn't provided, look it up
id, err = retrievePeerID(ctx, h, addr)
if err != nil {
return nil, err
}
return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, string(id)))
}

// Helper for retrieving the peer ID from a security error... obviously don't use this in production!
// See https://github.com/libp2p/go-libp2p-noise/blob/v0.3.0/handshake.go#L250
func retrievePeerID(ctx context.Context, h host.Host, addr string) (peer.ID, error) {
incorrectPeerID := "16Uiu2HAmSifdT5QutTsaET8xqjWAMPp4obrQv7LN79f2RMmBe3nY"
addrInfo, err := peer.AddrInfoFromString(fmt.Sprintf("%s/p2p/%s", addr, incorrectPeerID))
if err != nil {
return "", err
}
err = h.Connect(ctx, *addrInfo)
if err == nil {
return "", errors.New("unexpected successful connection")
}
if strings.Contains(err.Error(), "but remote key matches") {
split := strings.Split(err.Error(), " ")
return peer.ID(split[len(split)-1]), nil
}
return "", err
}

func sendBlobsSidecarsByRangeRequest(ctx context.Context, h host.Host, encoding encoder.NetworkEncoding, pid peer.ID, req *ethpb.BlobsSidecarsByRangeRequest) ([]*ethpb.BlobsSidecar, error) {
topic := fmt.Sprintf("%s%s", p2p.RPCBlobsSidecarsByRangeTopicV1, encoding.ProtocolSuffix())

stream, err := h.NewStream(ctx, pid, protocol.ID(topic))
if err != nil {
return nil, err
}
defer func() {
_ = stream.Close()
}()

if _, err := encoding.EncodeWithMaxLength(stream, req); err != nil {
_ = stream.Reset()
return nil, err
}

if err := stream.CloseWrite(); err != nil {
_ = stream.Reset()
return nil, err
}

var blobsSidecars []*ethpb.BlobsSidecar
for {
blobs, err := readChunkedBlobsSidecar(stream, encoding)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
blobsSidecars = append(blobsSidecars, blobs)
}
return blobsSidecars, nil
}

func readChunkedBlobsSidecar(stream libp2pcore.Stream, encoding encoder.NetworkEncoding) (*ethpb.BlobsSidecar, error) {
code, errMsg, err := sync.ReadStatusCode(stream, encoding)
if err != nil {
return nil, err
}
if code != 0 {
return nil, errors.New(errMsg)
}
sidecar := new(ethpb.BlobsSidecar)
err = encoding.DecodeWithMaxLength(stream, sidecar)
return sidecar, err
}
Loading