Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion download/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func main() {
}
anyBlobs = true
for _, blob := range sidecar.Blobs {
data := shared.DecodeBlob(blob.Blob)
data := shared.DecodeBlobs(blob.Blob)
_, _ = os.Stdout.Write(data)
}

Expand Down
21 changes: 19 additions & 2 deletions shared/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,28 @@ func EncodeBlobs(data []byte) types.Blobs {
return blobs
}

func DecodeBlob(blob [][]byte) []byte {
func DecodeBlobs(blobs [][]byte) []byte {
var data []byte
for _, b := range blob {
for _, b := range blobs {
// ignore the last byte in every 32-byte blob (see encoding in EncodeBlobs)
data = append(data, b[0:31]...)
}
return TrimArray(data)
}

func DecodeBlob(blob []byte) []byte {
var data []byte
for i, b := range blob {
// ignore the last byte in every 32-byte block (see encoding in EncodeBlobs)
if (i+1)%32 == 0 {
continue
}
data = append(data, b)
}
return TrimArray(data)
}

func TrimArray(data []byte) []byte {
// XXX: the following removes trailing 0s, which could be unexpected for certain blobs
i := len(data) - 1
for ; i >= 0; i-- {
Expand Down
15 changes: 7 additions & 8 deletions shared/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ func UpdateChainConfig(config *params.ChainConfig) error {
}

var (
GethRPC = "http://localhost:8545"
PrivateKey = "45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8"
BeaconRPC = "localhost:4000"
BeaconGatewayGRPC = "localhost:3500"
BeaconMultiAddress = "/ip4/0.0.0.0/tcp/13000"
BeaconFollowerRPC = "http://localhost:3501"
BeaconFollowerMultiAddress = "/ip4/0.0.0.0/tcp/13001"
ValidatorRPC = "http://localhost:7500"
GethRPC = "http://localhost:8545"
PrivateKey = "45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8"
BeaconGRPC = "localhost:4000"
BeaconGateway = "http://localhost:3500"
BeaconFollowerGRPC = "localhost:4001"
BeaconFollowerGateway = "http://localhost:3501"
ValidatorRPC = "http://localhost:7500"
)
168 changes: 18 additions & 150 deletions tests/blobtx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package main
import (
"bytes"
"context"
"errors"
"fmt"
"io"
beaconservice "github.com/prysmaticlabs/prysm/v3/proto/eth/service"
"log"
"math/big"
"strings"
"os"
"strconv"
"time"

"github.com/Inphi/eip4844-interop/shared"
Expand All @@ -18,25 +18,19 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/holiman/uint256"
"github.com/libp2p/go-libp2p"
libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
ma "github.com/multiformats/go-multiaddr"
"github.com/protolambda/ztyp/view"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/sync"
consensustypes "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1"
ethpbv2 "github.com/prysmaticlabs/prysm/v3/proto/eth/v2"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
)

func GetBlobs() types.Blobs {
// dummy data for the test
return shared.EncodeBlobs([]byte("EKANS"))
dat, err := os.ReadFile("./eth.png")
if err != nil {
log.Fatalf("error reading blobs file: %v", err)
}
return shared.EncodeBlobs(dat)
}

// 1. Uploads blobs
Expand All @@ -61,13 +55,13 @@ func main() {
slot := FindBlobSlot(ctx, startSlot)

log.Printf("checking blob from beacon node")
downloadedData := DownloadBlobs(ctx, slot, 1, shared.BeaconMultiAddress)
downloadedData := DownloadBlobs(ctx, slot, ctrl.Env.BeaconChainClient)
downloadedBlobs := shared.EncodeBlobs(downloadedData)
AssertBlobsEquals(blobs, downloadedBlobs)

log.Printf("checking blob from beacon node follower")
time.Sleep(time.Second * 2 * time.Duration(ctrl.Env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync
downloadedData = DownloadBlobs(ctx, slot, 1, shared.BeaconFollowerMultiAddress)
downloadedData = DownloadBlobs(ctx, slot, ctrl.Env.BeaconChainFollowerClient)
downloadedBlobs = shared.EncodeBlobs(downloadedData)
AssertBlobsEquals(blobs, downloadedBlobs)
}
Expand All @@ -77,7 +71,7 @@ func AssertBlobsEquals(a, b types.Blobs) {
if len(a) != len(b) {
log.Fatalf("data length mismatch (%d != %d)", len(a), len(b))
}
for i, _ := range a {
for i := range a {
for j := 0; j < params.FieldElementsPerBlob; j++ {
if !bytes.Equal(a[i][j][:], b[i][j][:]) {
log.Fatal("blobs data mismatch")
Expand Down Expand Up @@ -179,146 +173,20 @@ func FindBlobSlot(ctx context.Context, startSlot consensustypes.Slot) consensust
}
}

func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, count uint64, beaconMA string) []byte {
// TODO: Use Beacon gRPC to download blobs rather than p2p RPC
func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, client beaconservice.BeaconChainClient) []byte {
log.Print("downloading blobs...")

req := &ethpb.BlobsSidecarsByRangeRequest{
StartSlot: startSlot,
Count: count,
}

h, err := libp2p.New()
if err != nil {
log.Fatalf("failed to create libp2p context: %v", err)
}
defer func() {
_ = h.Close()
}()

multiaddr, err := getMultiaddr(ctx, h, beaconMA)
req := ethpbv1.BlobsRequest{BlockId: []byte(strconv.FormatUint(uint64(startSlot), 10))}
sidecar, err := client.GetBlobsSidecar(ctx, &req)
if err != nil {
log.Fatalf("getMultiAddr: %v", err)
log.Fatalf("failed to send blobs sidecar request: %v", err)
}

addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr)
if err != nil {
log.Fatal(err)
}

err = h.Connect(ctx, *addrInfo)
if err != nil {
log.Fatalf("libp2p host connect: %v", err)
}

sidecars, err := sendBlobsSidecarsByRangeRequest(ctx, h, encoder.SszNetworkEncoder{}, addrInfo.ID, req)
if err != nil {
log.Fatalf("failed to send blobs p2p request: %v", err)
}

anyBlobs := false
blobsBuffer := new(bytes.Buffer)
for _, sidecar := range sidecars {
if sidecar.Blobs == nil || len(sidecar.Blobs) == 0 {
continue
}
anyBlobs = true
for _, blob := range sidecar.Blobs {
data := shared.DecodeBlob(blob.Blob)
_, _ = blobsBuffer.Write(data)
}

// stop after the first sidecar with blobs:
break
}

if !anyBlobs {
log.Fatalf("No blobs found in requested slots, sidecar count: %d", len(sidecars))
for _, blob := range sidecar.Blobs {
data := shared.DecodeBlob(blob.Data)
_, _ = blobsBuffer.Write(data)
}

return blobsBuffer.Bytes()
}

func getMultiaddr(ctx context.Context, h host.Host, addr string) (ma.Multiaddr, error) {
multiaddr, err := ma.NewMultiaddr(addr)
if err != nil {
return nil, err
}
_, id := peer.SplitAddr(multiaddr)
if id != "" {
return multiaddr, nil
}
// peer ID wasn't provided, look it up
id, err = retrievePeerID(ctx, h, addr)
if err != nil {
return nil, err
}
return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, string(id)))
}

// Helper for retrieving the peer ID from a security error... obviously don't use this in production!
// See https://github.com/libp2p/go-libp2p-noise/blob/v0.3.0/handshake.go#L250
func retrievePeerID(ctx context.Context, h host.Host, addr string) (peer.ID, error) {
incorrectPeerID := "16Uiu2HAmSifdT5QutTsaET8xqjWAMPp4obrQv7LN79f2RMmBe3nY"
addrInfo, err := peer.AddrInfoFromString(fmt.Sprintf("%s/p2p/%s", addr, incorrectPeerID))
if err != nil {
return "", err
}
err = h.Connect(ctx, *addrInfo)
if err == nil {
return "", errors.New("unexpected successful connection")
}
if strings.Contains(err.Error(), "but remote key matches") {
split := strings.Split(err.Error(), " ")
return peer.ID(split[len(split)-1]), nil
}
return "", err
}

func sendBlobsSidecarsByRangeRequest(ctx context.Context, h host.Host, encoding encoder.NetworkEncoding, pid peer.ID, req *ethpb.BlobsSidecarsByRangeRequest) ([]*ethpb.BlobsSidecar, error) {
topic := fmt.Sprintf("%s%s", p2p.RPCBlobsSidecarsByRangeTopicV1, encoding.ProtocolSuffix())

stream, err := h.NewStream(ctx, pid, protocol.ID(topic))
if err != nil {
return nil, err
}
defer func() {
_ = stream.Close()
}()

if _, err := encoding.EncodeWithMaxLength(stream, req); err != nil {
_ = stream.Reset()
return nil, err
}

if err := stream.CloseWrite(); err != nil {
_ = stream.Reset()
return nil, err
}

var blobsSidecars []*ethpb.BlobsSidecar
for {
blobs, err := readChunkedBlobsSidecar(stream, encoding)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
blobsSidecars = append(blobsSidecars, blobs)
}
return blobsSidecars, nil
}

func readChunkedBlobsSidecar(stream libp2pcore.Stream, encoding encoder.NetworkEncoding) (*ethpb.BlobsSidecar, error) {
code, errMsg, err := sync.ReadStatusCode(stream, encoding)
if err != nil {
return nil, err
}
if code != 0 {
return nil, errors.New(errMsg)
}
sidecar := new(ethpb.BlobsSidecar)
err = encoding.DecodeWithMaxLength(stream, sidecar)
return sidecar, err
}
30 changes: 18 additions & 12 deletions tests/ctrl/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ func WaitForGeth(ctx context.Context) error {

func WaitForBeaconNode(ctx context.Context) error {
log.Printf("waiting for prysm beacon node")
return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", fmt.Sprintf("http://%s", shared.BeaconGatewayGRPC)))
return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", shared.BeaconGateway))
}

func WaitForBeaconNodeFollower(ctx context.Context) error {
log.Printf("waiting for prysm beacon node follower")
return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", shared.BeaconFollowerRPC))
return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", shared.BeaconFollowerGateway))
}

func WaitForValidator(ctx context.Context) error {
Expand Down Expand Up @@ -180,10 +180,11 @@ type BeaconChainConfig struct {
}

type TestEnvironment struct {
GethChainConfig *params.ChainConfig
BeaconChainConfig *BeaconChainConfig
EthClient *ethclient.Client
BeaconChainClient beaconservice.BeaconChainClient
GethChainConfig *params.ChainConfig
BeaconChainConfig *BeaconChainConfig
EthClient *ethclient.Client
BeaconChainClient beaconservice.BeaconChainClient
BeaconChainFollowerClient beaconservice.BeaconChainClient
}

func newTestEnvironment() *TestEnvironment {
Expand All @@ -192,15 +193,20 @@ func newTestEnvironment() *TestEnvironment {
if err != nil {
log.Fatalf("Failed to connect to the Ethereum client: %v", err)
}
beaconGRPCConn, err := grpc.DialContext(ctx, shared.BeaconRPC, grpc.WithInsecure())
beaconGRPCConn, err := grpc.DialContext(ctx, shared.BeaconGRPC, grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to dial beacon grpc", err)
log.Fatalf("Failed to dial beacon grpc: %v", err)
}
beaconFollowerGRPCConn, err := grpc.DialContext(ctx, shared.BeaconFollowerGRPC, grpc.WithInsecure())
if err != nil {
log.Fatalf("Failed to dial beacon grpc: %v", err)
}

return &TestEnvironment{
GethChainConfig: ReadGethChainConfig(),
BeaconChainConfig: ReadBeaconChainConfig(),
EthClient: eclient,
BeaconChainClient: beaconservice.NewBeaconChainClient(beaconGRPCConn),
GethChainConfig: ReadGethChainConfig(),
BeaconChainConfig: ReadBeaconChainConfig(),
EthClient: eclient,
BeaconChainClient: beaconservice.NewBeaconChainClient(beaconGRPCConn),
BeaconChainFollowerClient: beaconservice.NewBeaconChainClient(beaconFollowerGRPCConn),
}
}
Loading