diff --git a/download/main.go b/download/main.go index de42d84..3895130 100644 --- a/download/main.go +++ b/download/main.go @@ -84,7 +84,7 @@ func main() { } anyBlobs = true for _, blob := range sidecar.Blobs { - data := shared.DecodeBlob(blob.Blob) + data := shared.DecodeBlobs(blob.Blob) _, _ = os.Stdout.Write(data) } diff --git a/prysm b/prysm index 4f14a4f..e110f21 160000 --- a/prysm +++ b/prysm @@ -1 +1 @@ -Subproject commit 4f14a4fa50aae708254155a26b93cb30b91f45af +Subproject commit e110f216ea484166f5df1073fc83474cd50ce028 diff --git a/shared/blobs.go b/shared/blobs.go index f498c9d..daec421 100644 --- a/shared/blobs.go +++ b/shared/blobs.go @@ -25,11 +25,28 @@ func EncodeBlobs(data []byte) types.Blobs { return blobs } -func DecodeBlob(blob [][]byte) []byte { +func DecodeBlobs(blobs [][]byte) []byte { var data []byte - for _, b := range blob { + for _, b := range blobs { + // ignore the last byte in every 32-byte blob (see encoding in EncodeBlobs) data = append(data, b[0:31]...) } + return TrimArray(data) +} + +func DecodeBlob(blob []byte) []byte { + var data []byte + for i, b := range blob { + // ignore the last byte in every 32-byte block (see encoding in EncodeBlobs) + if (i+1)%32 == 0 { + continue + } + data = append(data, b) + } + return TrimArray(data) +} + +func TrimArray(data []byte) []byte { // XXX: the following removes trailing 0s, which could be unexpected for certain blobs i := len(data) - 1 for ; i >= 0; i-- { diff --git a/shared/config.go b/shared/config.go index 81b5299..cf8f1c7 100644 --- a/shared/config.go +++ b/shared/config.go @@ -40,12 +40,11 @@ func UpdateChainConfig(config *params.ChainConfig) error { } var ( - GethRPC = "http://localhost:8545" - PrivateKey = "45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8" - BeaconRPC = "localhost:4000" - BeaconGatewayGRPC = "localhost:3500" - BeaconMultiAddress = "/ip4/0.0.0.0/tcp/13000" - BeaconFollowerRPC = "http://localhost:3501" - BeaconFollowerMultiAddress = "/ip4/0.0.0.0/tcp/13001" - ValidatorRPC = "http://localhost:7500" + GethRPC = "http://localhost:8545" + PrivateKey = "45a915e4d060149eb4365960e6a7a45f334393093061116b197e3240065ff2d8" + BeaconGRPC = "localhost:4000" + BeaconGateway = "http://localhost:3500" + BeaconFollowerGRPC = "localhost:4001" + BeaconFollowerGateway = "http://localhost:3501" + ValidatorRPC = "http://localhost:7500" ) diff --git a/tests/blobtx/main.go b/tests/blobtx/main.go index 2546091..bbe3611 100644 --- a/tests/blobtx/main.go +++ b/tests/blobtx/main.go @@ -3,12 +3,12 @@ package main import ( "bytes" "context" - "errors" "fmt" - "io" + beaconservice "github.com/prysmaticlabs/prysm/v3/proto/eth/service" "log" "math/big" - "strings" + "os" + "strconv" "time" "github.com/Inphi/eip4844-interop/shared" @@ -18,25 +18,19 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/holiman/uint256" - "github.com/libp2p/go-libp2p" - libp2pcore "github.com/libp2p/go-libp2p-core" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" - ma "github.com/multiformats/go-multiaddr" "github.com/protolambda/ztyp/view" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/sync" consensustypes "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1" ethpbv2 "github.com/prysmaticlabs/prysm/v3/proto/eth/v2" - ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" ) func GetBlobs() types.Blobs { // dummy data for the test - return shared.EncodeBlobs([]byte("EKANS")) + dat, err := os.ReadFile("./eth.png") + if err != nil { + log.Fatalf("error reading blobs file: %v", err) + } + return shared.EncodeBlobs(dat) } // 1. Uploads blobs @@ -61,13 +55,13 @@ func main() { slot := FindBlobSlot(ctx, startSlot) log.Printf("checking blob from beacon node") - downloadedData := DownloadBlobs(ctx, slot, 1, shared.BeaconMultiAddress) + downloadedData := DownloadBlobs(ctx, slot, ctrl.Env.BeaconChainClient) downloadedBlobs := shared.EncodeBlobs(downloadedData) AssertBlobsEquals(blobs, downloadedBlobs) log.Printf("checking blob from beacon node follower") time.Sleep(time.Second * 2 * time.Duration(ctrl.Env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync - downloadedData = DownloadBlobs(ctx, slot, 1, shared.BeaconFollowerMultiAddress) + downloadedData = DownloadBlobs(ctx, slot, ctrl.Env.BeaconChainFollowerClient) downloadedBlobs = shared.EncodeBlobs(downloadedData) AssertBlobsEquals(blobs, downloadedBlobs) } @@ -77,7 +71,7 @@ func AssertBlobsEquals(a, b types.Blobs) { if len(a) != len(b) { log.Fatalf("data length mismatch (%d != %d)", len(a), len(b)) } - for i, _ := range a { + for i := range a { for j := 0; j < params.FieldElementsPerBlob; j++ { if !bytes.Equal(a[i][j][:], b[i][j][:]) { log.Fatal("blobs data mismatch") @@ -179,146 +173,20 @@ func FindBlobSlot(ctx context.Context, startSlot consensustypes.Slot) consensust } } -func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, count uint64, beaconMA string) []byte { - // TODO: Use Beacon gRPC to download blobs rather than p2p RPC +func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, client beaconservice.BeaconChainClient) []byte { log.Print("downloading blobs...") - req := ðpb.BlobsSidecarsByRangeRequest{ - StartSlot: startSlot, - Count: count, - } - - h, err := libp2p.New() - if err != nil { - log.Fatalf("failed to create libp2p context: %v", err) - } - defer func() { - _ = h.Close() - }() - - multiaddr, err := getMultiaddr(ctx, h, beaconMA) + req := ethpbv1.BlobsRequest{BlockId: []byte(strconv.FormatUint(uint64(startSlot), 10))} + sidecar, err := client.GetBlobsSidecar(ctx, &req) if err != nil { - log.Fatalf("getMultiAddr: %v", err) + log.Fatalf("failed to send blobs sidecar request: %v", err) } - addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr) - if err != nil { - log.Fatal(err) - } - - err = h.Connect(ctx, *addrInfo) - if err != nil { - log.Fatalf("libp2p host connect: %v", err) - } - - sidecars, err := sendBlobsSidecarsByRangeRequest(ctx, h, encoder.SszNetworkEncoder{}, addrInfo.ID, req) - if err != nil { - log.Fatalf("failed to send blobs p2p request: %v", err) - } - - anyBlobs := false blobsBuffer := new(bytes.Buffer) - for _, sidecar := range sidecars { - if sidecar.Blobs == nil || len(sidecar.Blobs) == 0 { - continue - } - anyBlobs = true - for _, blob := range sidecar.Blobs { - data := shared.DecodeBlob(blob.Blob) - _, _ = blobsBuffer.Write(data) - } - - // stop after the first sidecar with blobs: - break - } - - if !anyBlobs { - log.Fatalf("No blobs found in requested slots, sidecar count: %d", len(sidecars)) + for _, blob := range sidecar.Blobs { + data := shared.DecodeBlob(blob.Data) + _, _ = blobsBuffer.Write(data) } return blobsBuffer.Bytes() } - -func getMultiaddr(ctx context.Context, h host.Host, addr string) (ma.Multiaddr, error) { - multiaddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - _, id := peer.SplitAddr(multiaddr) - if id != "" { - return multiaddr, nil - } - // peer ID wasn't provided, look it up - id, err = retrievePeerID(ctx, h, addr) - if err != nil { - return nil, err - } - return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, string(id))) -} - -// Helper for retrieving the peer ID from a security error... obviously don't use this in production! -// See https://github.com/libp2p/go-libp2p-noise/blob/v0.3.0/handshake.go#L250 -func retrievePeerID(ctx context.Context, h host.Host, addr string) (peer.ID, error) { - incorrectPeerID := "16Uiu2HAmSifdT5QutTsaET8xqjWAMPp4obrQv7LN79f2RMmBe3nY" - addrInfo, err := peer.AddrInfoFromString(fmt.Sprintf("%s/p2p/%s", addr, incorrectPeerID)) - if err != nil { - return "", err - } - err = h.Connect(ctx, *addrInfo) - if err == nil { - return "", errors.New("unexpected successful connection") - } - if strings.Contains(err.Error(), "but remote key matches") { - split := strings.Split(err.Error(), " ") - return peer.ID(split[len(split)-1]), nil - } - return "", err -} - -func sendBlobsSidecarsByRangeRequest(ctx context.Context, h host.Host, encoding encoder.NetworkEncoding, pid peer.ID, req *ethpb.BlobsSidecarsByRangeRequest) ([]*ethpb.BlobsSidecar, error) { - topic := fmt.Sprintf("%s%s", p2p.RPCBlobsSidecarsByRangeTopicV1, encoding.ProtocolSuffix()) - - stream, err := h.NewStream(ctx, pid, protocol.ID(topic)) - if err != nil { - return nil, err - } - defer func() { - _ = stream.Close() - }() - - if _, err := encoding.EncodeWithMaxLength(stream, req); err != nil { - _ = stream.Reset() - return nil, err - } - - if err := stream.CloseWrite(); err != nil { - _ = stream.Reset() - return nil, err - } - - var blobsSidecars []*ethpb.BlobsSidecar - for { - blobs, err := readChunkedBlobsSidecar(stream, encoding) - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, err - } - blobsSidecars = append(blobsSidecars, blobs) - } - return blobsSidecars, nil -} - -func readChunkedBlobsSidecar(stream libp2pcore.Stream, encoding encoder.NetworkEncoding) (*ethpb.BlobsSidecar, error) { - code, errMsg, err := sync.ReadStatusCode(stream, encoding) - if err != nil { - return nil, err - } - if code != 0 { - return nil, errors.New(errMsg) - } - sidecar := new(ethpb.BlobsSidecar) - err = encoding.DecodeWithMaxLength(stream, sidecar) - return sidecar, err -} diff --git a/tests/ctrl/bootstrap.go b/tests/ctrl/bootstrap.go index 5624aae..cfa4a51 100644 --- a/tests/ctrl/bootstrap.go +++ b/tests/ctrl/bootstrap.go @@ -44,12 +44,12 @@ func WaitForGeth(ctx context.Context) error { func WaitForBeaconNode(ctx context.Context) error { log.Printf("waiting for prysm beacon node") - return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", fmt.Sprintf("http://%s", shared.BeaconGatewayGRPC))) + return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", shared.BeaconGateway)) } func WaitForBeaconNodeFollower(ctx context.Context) error { log.Printf("waiting for prysm beacon node follower") - return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", shared.BeaconFollowerRPC)) + return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", shared.BeaconFollowerGateway)) } func WaitForValidator(ctx context.Context) error { @@ -180,10 +180,11 @@ type BeaconChainConfig struct { } type TestEnvironment struct { - GethChainConfig *params.ChainConfig - BeaconChainConfig *BeaconChainConfig - EthClient *ethclient.Client - BeaconChainClient beaconservice.BeaconChainClient + GethChainConfig *params.ChainConfig + BeaconChainConfig *BeaconChainConfig + EthClient *ethclient.Client + BeaconChainClient beaconservice.BeaconChainClient + BeaconChainFollowerClient beaconservice.BeaconChainClient } func newTestEnvironment() *TestEnvironment { @@ -192,15 +193,20 @@ func newTestEnvironment() *TestEnvironment { if err != nil { log.Fatalf("Failed to connect to the Ethereum client: %v", err) } - beaconGRPCConn, err := grpc.DialContext(ctx, shared.BeaconRPC, grpc.WithInsecure()) + beaconGRPCConn, err := grpc.DialContext(ctx, shared.BeaconGRPC, grpc.WithInsecure()) if err != nil { - log.Fatalf("Failed to dial beacon grpc", err) + log.Fatalf("Failed to dial beacon grpc: %v", err) + } + beaconFollowerGRPCConn, err := grpc.DialContext(ctx, shared.BeaconFollowerGRPC, grpc.WithInsecure()) + if err != nil { + log.Fatalf("Failed to dial beacon grpc: %v", err) } return &TestEnvironment{ - GethChainConfig: ReadGethChainConfig(), - BeaconChainConfig: ReadBeaconChainConfig(), - EthClient: eclient, - BeaconChainClient: beaconservice.NewBeaconChainClient(beaconGRPCConn), + GethChainConfig: ReadGethChainConfig(), + BeaconChainConfig: ReadBeaconChainConfig(), + EthClient: eclient, + BeaconChainClient: beaconservice.NewBeaconChainClient(beaconGRPCConn), + BeaconChainFollowerClient: beaconservice.NewBeaconChainClient(beaconFollowerGRPCConn), } } diff --git a/tests/fee-market/main.go b/tests/fee-market/main.go index 148d806..0d79431 100644 --- a/tests/fee-market/main.go +++ b/tests/fee-market/main.go @@ -3,13 +3,12 @@ package main import ( "bytes" "context" - "errors" "fmt" - "io" + beaconservice "github.com/prysmaticlabs/prysm/v3/proto/eth/service" "log" "math/big" "sort" - "strings" + "strconv" "sync" "time" @@ -20,20 +19,10 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/params" "github.com/holiman/uint256" - "github.com/libp2p/go-libp2p" - libp2pcore "github.com/libp2p/go-libp2p-core" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" - ma "github.com/multiformats/go-multiaddr" "github.com/protolambda/ztyp/view" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder" - beaconchainsync "github.com/prysmaticlabs/prysm/v3/beacon-chain/sync" consensustypes "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1" ethpbv2 "github.com/prysmaticlabs/prysm/v3/proto/eth/v2" - ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" ) func GetBlob() types.Blobs { @@ -72,7 +61,7 @@ func main() { log.Printf("checking blob from beacon node") var downloadedData []byte for _, b := range blocks { - data := DownloadBlobs(ctx, b.Slot, 1, shared.BeaconMultiAddress) + data := DownloadBlobs(ctx, b.Slot, ctrl.Env.BeaconChainClient) downloadedData = append(downloadedData, data...) } @@ -87,7 +76,7 @@ func main() { downloadedData = nil for _, b := range blocks { - data := DownloadBlobs(ctx, b.Slot, 1, shared.BeaconFollowerMultiAddress) + data := DownloadBlobs(ctx, b.Slot, ctrl.Env.BeaconChainFollowerClient) downloadedData = append(downloadedData, data...) } if !bytes.Equal(flatBlobs, downloadedData) { @@ -103,7 +92,7 @@ func FlattenBlobs(blobsData []types.Blobs) []byte { for i := range blob { rawBlob[i] = blob[i][:] } - decoded := shared.DecodeBlob(rawBlob) + decoded := shared.DecodeBlobs(rawBlob) out = append(out, decoded...) } } @@ -273,149 +262,20 @@ func FindBlocksWithBlobs(ctx context.Context, startSlot consensustypes.Slot) []* return blocks } -func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, count uint64, beaconMA string) []byte { - // TODO: Use Beacon gRPC to download blobs rather than p2p RPC +func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, client beaconservice.BeaconChainClient) []byte { log.Print("downloading blobs...") - req := ðpb.BlobsSidecarsByRangeRequest{ - StartSlot: startSlot, - Count: count, - } - - h, err := libp2p.New() - if err != nil { - log.Fatalf("failed to create libp2p context: %v", err) - } - defer func() { - _ = h.Close() - }() - - multiaddr, err := getMultiaddr(ctx, h, beaconMA) - if err != nil { - log.Fatalf("getMultiAddr: %v", err) - } - - addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr) - if err != nil { - log.Fatal(err) - } - - err = h.Connect(ctx, *addrInfo) - if err != nil { - log.Fatalf("libp2p host connect: %v", err) - } - - // Hack to ensure that we are able to download blob chunks with larger chunk sizes (which is 10 MiB post-bellatrix) - encoder.MaxChunkSize = 10 << 20 - sidecars, err := sendBlobsSidecarsByRangeRequest(ctx, h, encoder.SszNetworkEncoder{}, addrInfo.ID, req) + req := ethpbv1.BlobsRequest{BlockId: []byte(strconv.FormatUint(uint64(startSlot), 10))} + sidecar, err := client.GetBlobsSidecar(ctx, &req) if err != nil { - log.Fatalf("failed to send blobs p2p request: %v", err) + log.Fatalf("failed to send blobs sidecar request: %v", err) } - anyBlobs := false blobsBuffer := new(bytes.Buffer) - for _, sidecar := range sidecars { - log.Printf("found sidecar with %d blobs", len(sidecar.Blobs)) - if sidecar.Blobs == nil || len(sidecar.Blobs) == 0 { - continue - } - anyBlobs = true - for _, blob := range sidecar.Blobs { - data := shared.DecodeBlob(blob.Blob) - _, _ = blobsBuffer.Write(data) - } - - // stop after the first sidecar with blobs: - break - } - - if !anyBlobs { - log.Fatalf("No blobs found in requested slots, sidecar count: %d", len(sidecars)) + for _, blob := range sidecar.Blobs { + data := shared.DecodeBlob(blob.Data) + _, _ = blobsBuffer.Write(data) } return blobsBuffer.Bytes() } - -func getMultiaddr(ctx context.Context, h host.Host, addr string) (ma.Multiaddr, error) { - multiaddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - _, id := peer.SplitAddr(multiaddr) - if id != "" { - return multiaddr, nil - } - // peer ID wasn't provided, look it up - id, err = retrievePeerID(ctx, h, addr) - if err != nil { - return nil, err - } - return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, string(id))) -} - -// Helper for retrieving the peer ID from a security error... obviously don't use this in production! -// See https://github.com/libp2p/go-libp2p-noise/blob/v0.3.0/handshake.go#L250 -func retrievePeerID(ctx context.Context, h host.Host, addr string) (peer.ID, error) { - incorrectPeerID := "16Uiu2HAmSifdT5QutTsaET8xqjWAMPp4obrQv7LN79f2RMmBe3nY" - addrInfo, err := peer.AddrInfoFromString(fmt.Sprintf("%s/p2p/%s", addr, incorrectPeerID)) - if err != nil { - return "", err - } - err = h.Connect(ctx, *addrInfo) - if err == nil { - return "", errors.New("unexpected successful connection") - } - if strings.Contains(err.Error(), "but remote key matches") { - split := strings.Split(err.Error(), " ") - return peer.ID(split[len(split)-1]), nil - } - return "", err -} - -func sendBlobsSidecarsByRangeRequest(ctx context.Context, h host.Host, encoding encoder.NetworkEncoding, pid peer.ID, req *ethpb.BlobsSidecarsByRangeRequest) ([]*ethpb.BlobsSidecar, error) { - topic := fmt.Sprintf("%s%s", p2p.RPCBlobsSidecarsByRangeTopicV1, encoding.ProtocolSuffix()) - - stream, err := h.NewStream(ctx, pid, protocol.ID(topic)) - if err != nil { - return nil, err - } - defer func() { - _ = stream.Close() - }() - - if _, err := encoding.EncodeWithMaxLength(stream, req); err != nil { - _ = stream.Reset() - return nil, err - } - - if err := stream.CloseWrite(); err != nil { - _ = stream.Reset() - return nil, err - } - - var blobsSidecars []*ethpb.BlobsSidecar - for { - blobs, err := readChunkedBlobsSidecar(stream, encoding) - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, err - } - blobsSidecars = append(blobsSidecars, blobs) - } - return blobsSidecars, nil -} - -func readChunkedBlobsSidecar(stream libp2pcore.Stream, encoding encoder.NetworkEncoding) (*ethpb.BlobsSidecar, error) { - code, errMsg, err := beaconchainsync.ReadStatusCode(stream, encoding) - if err != nil { - return nil, err - } - if code != 0 { - return nil, errors.New(errMsg) - } - sidecar := new(ethpb.BlobsSidecar) - err = encoding.DecodeWithMaxLength(stream, sidecar) - return sidecar, err -}