diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7ee60b9..9796a23 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -30,6 +30,9 @@ jobs: - name: Run Fee market spec tests run: go run ./tests/fee-market + - name: Run Initial sync tests + run: go run ./tests/initial-sync + - name: Collect docker logs on failure if: failure() uses: jwalton/gh-docker-logs@v1 diff --git a/go.mod b/go.mod index 89fe451..424e9c3 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/protolambda/ztyp v0.2.1 github.com/prysmaticlabs/prysm/v3 v3.1.1 github.com/wealdtech/go-bytesutil v1.1.1 + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f google.golang.org/grpc v1.40.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -179,7 +180,6 @@ require ( golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b // indirect - golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect golang.org/x/sys v0.0.0-20220818161305-2296e01440c6 // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect diff --git a/shared/config.go b/shared/config.go index 363f346..2ca9505 100644 --- a/shared/config.go +++ b/shared/config.go @@ -45,7 +45,8 @@ var ( BeaconRPC = "localhost:4000" BeaconGatewayGRPC = "localhost:3500" BeaconMultiAddress = "/ip4/0.0.0.0/tcp/13000" - BeaconFollowerRPC = "http://localhost:3501" + BeaconFollowerRPC = "localhost:4001" + BeaconGatewayFollowerGRPC = "localhost:3501" BeaconFollowerMultiAddress = "/ip4/0.0.0.0/tcp/13001" ValidatorRPC = "http://localhost:7500" ) diff --git a/tests/blobtx/main.go b/tests/blobtx/main.go index 7f33767..e6b8a1c 100644 --- a/tests/blobtx/main.go +++ b/tests/blobtx/main.go @@ -1,37 +1,20 @@ package main import ( - "bytes" "context" - "errors" - "fmt" - "io" "log" "math/big" - "strings" "time" "github.com/Inphi/eip4844-interop/shared" "github.com/Inphi/eip4844-interop/tests/ctrl" + "github.com/Inphi/eip4844-interop/tests/util" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/ethclient" "github.com/holiman/uint256" - "github.com/libp2p/go-libp2p" - libp2pcore "github.com/libp2p/go-libp2p-core" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" - ma "github.com/multiformats/go-multiaddr" "github.com/protolambda/ztyp/view" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/sync" - consensustypes "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" - ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1" - ethpbv2 "github.com/prysmaticlabs/prysm/v3/proto/eth/v2" - ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" ) func GetBlobs() types.Blobs { @@ -47,61 +30,42 @@ func main() { ctrl.InitE2ETest() ctrl.WaitForShardingFork() ctrl.WaitForEip4844ForkEpoch() + env := ctrl.GetEnv() ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20) defer cancel() + ethClient, err := ctrl.GetExecutionClient(ctx) + if err != nil { + log.Fatalf("unable to get execution client: %v", err) + } + beaconClient, err := ctrl.GetBeaconNodeClient(ctx) + if err != nil { + log.Fatalf("unable to get beacon client: %v", err) + } + blobs := GetBlobs() // Retrieve the current slot to being our blobs search on the beacon chain - startSlot := GetHeadSlot(ctx) + startSlot := util.GetHeadSlot(ctx, beaconClient) - UploadBlobs(ctx, blobs) - WaitForNextSlot(ctx) - slot := FindBlobSlot(ctx, startSlot) + UploadBlobs(ctx, ethClient, blobs) + util.WaitForNextSlots(ctx, beaconClient, 1) + slot := util.FindBlobSlot(ctx, beaconClient, startSlot) log.Printf("checking blob from beacon node") - downloadedData := DownloadBlobs(ctx, slot, 1, shared.BeaconMultiAddress) + downloadedData := util.DownloadBlobs(ctx, slot, 1, shared.BeaconMultiAddress) downloadedBlobs := shared.EncodeBlobs(downloadedData) - AssertBlobsEquals(blobs, downloadedBlobs) + util.AssertBlobsEquals(blobs, downloadedBlobs) log.Printf("checking blob from beacon node follower") - time.Sleep(time.Second * 2 * time.Duration(ctrl.Env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync - downloadedData = DownloadBlobs(ctx, slot, 1, shared.BeaconFollowerMultiAddress) + time.Sleep(time.Second * 2 * time.Duration(env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync + downloadedData = util.DownloadBlobs(ctx, slot, 1, shared.BeaconFollowerMultiAddress) downloadedBlobs = shared.EncodeBlobs(downloadedData) - AssertBlobsEquals(blobs, downloadedBlobs) + util.AssertBlobsEquals(blobs, downloadedBlobs) } -func AssertBlobsEquals(a, b types.Blobs) { - // redundant for nice for debugging - if len(a) != len(b) { - log.Fatalf("data length mismatch (%d != %d)", len(a), len(b)) - } - for i, _ := range a { - for j := 0; j < params.FieldElementsPerBlob; j++ { - if !bytes.Equal(a[i][j][:], b[i][j][:]) { - log.Fatal("blobs data mismatch") - } - } - } -} - -func WaitForNextSlot(ctx context.Context) { - if err := ctrl.WaitForSlot(ctx, GetHeadSlot(ctx).Add(1)); err != nil { - log.Fatalf("error waiting for next slot: %v", err) - } -} - -func GetHeadSlot(ctx context.Context) consensustypes.Slot { - req := ðpbv1.BlockRequest{BlockId: []byte("head")} - header, err := ctrl.Env.BeaconChainClient.GetBlockHeader(ctx, req) - if err != nil { - log.Fatalf("unable to get beacon chain head: %v", err) - } - return header.Data.Header.Message.Slot -} - -func UploadBlobs(ctx context.Context, blobs types.Blobs) { +func UploadBlobs(ctx context.Context, client *ethclient.Client, blobs types.Blobs) { chainId := big.NewInt(1) signer := types.NewDankSigner(chainId) @@ -110,7 +74,7 @@ func UploadBlobs(ctx context.Context, blobs types.Blobs) { log.Fatalf("Failed to load private key: %v", err) } - nonce, err := ctrl.Env.EthClient.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey)) + nonce, err := client.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey)) if err != nil { log.Fatalf("Error getting nonce: %v", err) } @@ -143,183 +107,14 @@ func UploadBlobs(ctx context.Context, blobs types.Blobs) { if err != nil { log.Fatalf("Error signing tx: %v", err) } - err = ctrl.Env.EthClient.SendTransaction(ctx, tx) + err = client.SendTransaction(ctx, tx) if err != nil { log.Fatalf("Error sending tx: %v", err) } log.Printf("Transaction submitted. hash=%v", tx.Hash()) log.Printf("Waiting for transaction (%v) to be included...", tx.Hash()) - if _, err := shared.WaitForReceipt(ctx, ctrl.Env.EthClient, tx.Hash()); err != nil { + if _, err := shared.WaitForReceipt(ctx, client, tx.Hash()); err != nil { log.Fatalf("Error waiting for transaction receipt %v: %v", tx.Hash(), err) } } - -func FindBlobSlot(ctx context.Context, startSlot consensustypes.Slot) consensustypes.Slot { - slot := startSlot - endSlot := GetHeadSlot(ctx) - for { - if slot == endSlot { - log.Fatalf("Unable to find beacon block containing blobs") - } - - blockID := fmt.Sprintf("%d", uint64(slot)) - req := ðpbv2.BlockRequestV2{BlockId: []byte(blockID)} - block, err := ctrl.Env.BeaconChainClient.GetBlockV2(ctx, req) - if err != nil { - log.Fatalf("beaconchainclient.GetBlock: %v", err) - } - eip4844, ok := block.Data.Message.(*ethpbv2.SignedBeaconBlockContainer_Eip4844Block) - if ok { - if len(eip4844.Eip4844Block.Body.BlobKzgs) != 0 { - return eip4844.Eip4844Block.Slot - } - } - - slot = slot.Add(1) - } -} - -func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, count uint64, beaconMA string) []byte { - // TODO: Use Beacon gRPC to download blobs rather than p2p RPC - log.Print("downloading blobs...") - - req := ðpb.BlobsSidecarsByRangeRequest{ - StartSlot: startSlot, - Count: count, - } - - h, err := libp2p.New() - if err != nil { - log.Fatalf("failed to create libp2p context: %v", err) - } - defer func() { - _ = h.Close() - }() - - multiaddr, err := getMultiaddr(ctx, h, beaconMA) - if err != nil { - log.Fatalf("getMultiAddr: %v", err) - } - - addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr) - if err != nil { - log.Fatal(err) - } - - err = h.Connect(ctx, *addrInfo) - if err != nil { - log.Fatalf("libp2p host connect: %v", err) - } - - sidecars, err := sendBlobsSidecarsByRangeRequest(ctx, h, encoder.SszNetworkEncoder{}, addrInfo.ID, req) - if err != nil { - log.Fatalf("failed to send blobs p2p request: %v", err) - } - - anyBlobs := false - blobsBuffer := new(bytes.Buffer) - for _, sidecar := range sidecars { - if sidecar.Blobs == nil || len(sidecar.Blobs) == 0 { - continue - } - anyBlobs = true - for _, blob := range sidecar.Blobs { - data := shared.DecodeBlob(blob.Blob) - _, _ = blobsBuffer.Write(data) - } - - // stop after the first sidecar with blobs: - break - } - - if !anyBlobs { - log.Fatalf("No blobs found in requested slots, sidecar count: %d", len(sidecars)) - } - - return blobsBuffer.Bytes() -} - -func getMultiaddr(ctx context.Context, h host.Host, addr string) (ma.Multiaddr, error) { - multiaddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - _, id := peer.SplitAddr(multiaddr) - if id != "" { - return multiaddr, nil - } - // peer ID wasn't provided, look it up - id, err = retrievePeerID(ctx, h, addr) - if err != nil { - return nil, err - } - return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, string(id))) -} - -// Helper for retrieving the peer ID from a security error... obviously don't use this in production! -// See https://github.com/libp2p/go-libp2p-noise/blob/v0.3.0/handshake.go#L250 -func retrievePeerID(ctx context.Context, h host.Host, addr string) (peer.ID, error) { - incorrectPeerID := "16Uiu2HAmSifdT5QutTsaET8xqjWAMPp4obrQv7LN79f2RMmBe3nY" - addrInfo, err := peer.AddrInfoFromString(fmt.Sprintf("%s/p2p/%s", addr, incorrectPeerID)) - if err != nil { - return "", err - } - err = h.Connect(ctx, *addrInfo) - if err == nil { - return "", errors.New("unexpected successful connection") - } - if strings.Contains(err.Error(), "but remote key matches") { - split := strings.Split(err.Error(), " ") - return peer.ID(split[len(split)-1]), nil - } - return "", err -} - -func sendBlobsSidecarsByRangeRequest(ctx context.Context, h host.Host, encoding encoder.NetworkEncoding, pid peer.ID, req *ethpb.BlobsSidecarsByRangeRequest) ([]*ethpb.BlobsSidecar, error) { - topic := fmt.Sprintf("%s%s", p2p.RPCBlobsSidecarsByRangeTopicV1, encoding.ProtocolSuffix()) - - stream, err := h.NewStream(ctx, pid, protocol.ID(topic)) - if err != nil { - return nil, err - } - defer func() { - _ = stream.Close() - }() - - if _, err := encoding.EncodeWithMaxLength(stream, req); err != nil { - _ = stream.Reset() - return nil, err - } - - if err := stream.CloseWrite(); err != nil { - _ = stream.Reset() - return nil, err - } - - var blobsSidecars []*ethpb.BlobsSidecar - for { - blobs, err := readChunkedBlobsSidecar(stream, encoding) - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, err - } - blobsSidecars = append(blobsSidecars, blobs) - } - return blobsSidecars, nil -} - -func readChunkedBlobsSidecar(stream libp2pcore.Stream, encoding encoder.NetworkEncoding) (*ethpb.BlobsSidecar, error) { - code, errMsg, err := sync.ReadStatusCode(stream, encoding) - if err != nil { - return nil, err - } - if code != 0 { - return nil, errors.New(errMsg) - } - sidecar := new(ethpb.BlobsSidecar) - err = encoding.DecodeWithMaxLength(stream, sidecar) - return sidecar, err -} diff --git a/tests/ctrl/bootstrap.go b/tests/ctrl/bootstrap.go index 5624aae..2d3bd49 100644 --- a/tests/ctrl/bootstrap.go +++ b/tests/ctrl/bootstrap.go @@ -6,99 +6,54 @@ import ( "fmt" "io/ioutil" "log" - "net/http" "time" "github.com/Inphi/eip4844-interop/shared" "github.com/ethereum/go-ethereum/core" - "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/params" types "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" beaconservice "github.com/prysmaticlabs/prysm/v3/proto/eth/service" ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1" - "google.golang.org/grpc" + "golang.org/x/sync/errgroup" "gopkg.in/yaml.v2" ) -func WaitForService(ctx context.Context, url string) error { - ctx, cancel := context.WithTimeout(ctx, time.Second*60) - defer cancel() - for { - if _, err := http.Get(url); err == nil { - break - } - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(1 * time.Second): - } - } - return nil - -} - -func WaitForGeth(ctx context.Context) error { - log.Printf("waiting for geth") - return WaitForService(ctx, shared.GethRPC) -} +var env *TestEnvironment -func WaitForBeaconNode(ctx context.Context) error { - log.Printf("waiting for prysm beacon node") - return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", fmt.Sprintf("http://%s", shared.BeaconGatewayGRPC))) -} - -func WaitForBeaconNodeFollower(ctx context.Context) error { - log.Printf("waiting for prysm beacon node follower") - return WaitForService(ctx, fmt.Sprintf("%s/eth/v1/beacon/genesis", shared.BeaconFollowerRPC)) -} - -func WaitForValidator(ctx context.Context) error { - log.Printf("waiting for validator") - return WaitForService(ctx, shared.ValidatorRPC) -} - -func WaitForServices(ctx context.Context) error { - if err := WaitForGeth(ctx); err != nil { - return fmt.Errorf("%w: geth offlinev", err) - } - if err := WaitForBeaconNode(ctx); err != nil { - return fmt.Errorf("%w: beacon node offline", err) - } - if err := WaitForBeaconNodeFollower(ctx); err != nil { - return fmt.Errorf("%w: beacon node follower offline", err) - } - if err := WaitForValidator(ctx); err != nil { - return fmt.Errorf("%w: validator is offline", err) +func InitE2ETest() { + ctx := context.Background() + if err := StopDevnet(); err != nil { + log.Fatalf("unable to stop devnet: %v", err) } - return nil + env := GetEnv() + env.StartAll(ctx) } -var Env *TestEnvironment - -func InitE2ETest() { - if err := RestartDevnet(); err != nil { - log.Fatalf("unable to restart devnet: %v", err) +func GetEnv() *TestEnvironment { + if env == nil { + env = newTestEnvironment() } - if err := WaitForServices(context.Background()); err != nil { - log.Fatal(err) - } - - Env = newTestEnvironment() + return env } func WaitForShardingFork() { ctx := context.Background() - config := Env.GethChainConfig + config := env.GethChainConfig eip4844ForkBlock := config.ShardingForkBlock.Uint64() stallTimeout := 1 * time.Minute + client, err := GetExecutionClient(ctx) + if err != nil { + log.Fatalf("unable to retrive beacon node client: %v", err) + } + log.Printf("waiting for sharding fork block...") var lastBn uint64 var lastUpdate time.Time for { - bn, err := Env.EthClient.BlockNumber(ctx) + bn, err := client.BlockNumber(ctx) if err != nil { log.Fatalf("ethclient.BlockNumber: %v", err) } @@ -143,9 +98,17 @@ func ReadBeaconChainConfig() *BeaconChainConfig { } func WaitForSlot(ctx context.Context, slot types.Slot) error { + client, err := GetBeaconNodeClient(ctx) + if err != nil { + return err + } + return WaitForSlotWithClient(ctx, client, slot) +} + +func WaitForSlotWithClient(ctx context.Context, client beaconservice.BeaconChainClient, slot types.Slot) error { req := ðpbv1.BlockRequest{BlockId: []byte("head")} for { - header, err := Env.BeaconChainClient.GetBlockHeader(ctx, req) + header, err := client.GetBlockHeader(ctx, req) if err != nil { return fmt.Errorf("unable to retrieve block header: %v", err) } @@ -153,17 +116,17 @@ func WaitForSlot(ctx context.Context, slot types.Slot) error { if headSlot >= slot { break } - time.Sleep(time.Second * time.Duration(Env.BeaconChainConfig.SecondsPerSlot)) + time.Sleep(time.Second * time.Duration(env.BeaconChainConfig.SecondsPerSlot)) } return nil } func WaitForEip4844ForkEpoch() { - log.Printf("waiting for eip4844 fork epoch...") + log.Println("waiting for eip4844 fork epoch...") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() - config := Env.BeaconChainConfig + config := env.BeaconChainConfig eip4844Slot := config.Eip4844ForkEpoch * config.SlotsPerEpoch if err := WaitForSlot(ctx, types.Slot(eip4844Slot)); err != nil { log.Fatal(err) @@ -180,27 +143,43 @@ type BeaconChainConfig struct { } type TestEnvironment struct { - GethChainConfig *params.ChainConfig - BeaconChainConfig *BeaconChainConfig - EthClient *ethclient.Client - BeaconChainClient beaconservice.BeaconChainClient + GethChainConfig *params.ChainConfig + BeaconChainConfig *BeaconChainConfig + BeaconNode Service + GethNode Service + ValidatorNode Service + BeaconNodeFollower Service + GethNode2 Service } func newTestEnvironment() *TestEnvironment { - ctx := context.Background() - eclient, err := ethclient.DialContext(ctx, shared.GethRPC) - if err != nil { - log.Fatalf("Failed to connect to the Ethereum client: %v", err) - } - beaconGRPCConn, err := grpc.DialContext(ctx, shared.BeaconRPC, grpc.WithInsecure()) - if err != nil { - log.Fatalf("Failed to dial beacon grpc", err) - } - return &TestEnvironment{ - GethChainConfig: ReadGethChainConfig(), - BeaconChainConfig: ReadBeaconChainConfig(), - EthClient: eclient, - BeaconChainClient: beaconservice.NewBeaconChainClient(beaconGRPCConn), + GethChainConfig: ReadGethChainConfig(), + BeaconChainConfig: ReadBeaconChainConfig(), + BeaconNode: NewBeaconNode(), + GethNode: NewGethNode(), + ValidatorNode: NewValidatorNode(), + BeaconNodeFollower: NewBeaconNodeFollower(), + GethNode2: NewGethNode2(), } } + +func (env *TestEnvironment) StartAll(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return env.BeaconNode.Start(ctx) + }) + g.Go(func() error { + return env.GethNode.Start(ctx) + }) + g.Go(func() error { + return env.ValidatorNode.Start(ctx) + }) + g.Go(func() error { + return env.BeaconNodeFollower.Start(ctx) + }) + g.Go(func() error { + return env.GethNode2.Start(ctx) + }) + return g.Wait() +} diff --git a/tests/ctrl/ctrl.go b/tests/ctrl/ctrl.go index 92d7a9a..96fec59 100644 --- a/tests/ctrl/ctrl.go +++ b/tests/ctrl/ctrl.go @@ -1,20 +1,14 @@ package ctrl import ( + "fmt" "log" "os" "os/exec" + "strings" + "sync" ) -var services = []string{ - "execution-node", - "execution-node-2", - "beacon-node", - "beacon-node-follower", - "validator-node", - "jaeger-tracing", -} - func Run(cmd *exec.Cmd) error { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr @@ -31,25 +25,34 @@ func Run(cmd *exec.Cmd) error { return nil } -func StartDevnet() error { - err := Run(exec.Command("/bin/sh", "-c", "docker-compose up -d")) +// guards against concurrent access to the docker daemon +var dockerMutex sync.Mutex + +func StartServices(svcs ...string) error { + dockerMutex.Lock() + defer dockerMutex.Unlock() + + svcArg := strings.Join(svcs, " ") + log.Printf("starting services %s", svcArg) + err := Run(exec.Command("/bin/sh", "-c", fmt.Sprintf("docker-compose up -d %s", svcArg))) if err != nil && err.(*exec.ExitError).ExitCode() == 127 { - err = Run(exec.Command("/bin/sh", "-c", "docker compose up -d")) + err = Run(exec.Command("/bin/sh", "-c", fmt.Sprintf("docker compose up -d %s", svcArg))) } return err } -func StopDevnet() error { - err := Run(exec.Command("/bin/sh", "-c", "docker-compose down -v")) +func StopService(svc string) error { + err := Run(exec.Command("/bin/sh", "-c", fmt.Sprintf("docker-compose stop %s", svc))) if err != nil && err.(*exec.ExitError).ExitCode() == 127 { - err = Run(exec.Command("/bin/sh", "-c", "docker compose down -v")) + err = Run(exec.Command("/bin/sh", "-c", fmt.Sprintf("docker compose stop %s", svc))) } return err } -func RestartDevnet() error { - if err := StopDevnet(); err != nil { - return err +func StopDevnet() error { + err := Run(exec.Command("/bin/sh", "-c", "docker-compose down -v")) + if err != nil && err.(*exec.ExitError).ExitCode() == 127 { + err = Run(exec.Command("/bin/sh", "-c", "docker compose down -v")) } - return StartDevnet() + return err } diff --git a/tests/ctrl/services.go b/tests/ctrl/services.go new file mode 100644 index 0000000..e193e11 --- /dev/null +++ b/tests/ctrl/services.go @@ -0,0 +1,122 @@ +package ctrl + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/Inphi/eip4844-interop/shared" + "github.com/ethereum/go-ethereum/ethclient" + beaconservice "github.com/prysmaticlabs/prysm/v3/proto/eth/service" + "google.golang.org/grpc" +) + +type Service interface { + Start(ctx context.Context) error + Stop(ctx context.Context) error + Started() <-chan struct{} +} + +func NewBeaconNode() Service { + url := fmt.Sprintf("http://%s/eth/v1/beacon/genesis", shared.BeaconGatewayGRPC) + return newDockerService("beacon-node", url) +} + +func NewValidatorNode() Service { + return newDockerService("validator-node", shared.ValidatorRPC) +} + +func GetBeaconNodeClient(ctx context.Context) (beaconservice.BeaconChainClient, error) { + // TODO: cache conns for reuse + conn, err := grpc.DialContext(ctx, shared.BeaconRPC, grpc.WithInsecure()) + if err != nil { + return nil, fmt.Errorf("%w: failed to dial beacon grpc", err) + } + return beaconservice.NewBeaconChainClient(conn), nil +} + +func NewBeaconNodeFollower() Service { + url := fmt.Sprintf("http://%s/eth/v1/beacon/genesis", shared.BeaconGatewayFollowerGRPC) + return newDockerService("beacon-node-follower", url) +} + +func GetBeaconNodeFollowerClient(ctx context.Context) (beaconservice.BeaconChainClient, error) { + // TODO: cache conns for reuse + conn, err := grpc.DialContext(ctx, shared.BeaconFollowerRPC, grpc.WithInsecure()) + if err != nil { + return nil, fmt.Errorf("%w: failed to dial beacon follower grpc", err) + } + return beaconservice.NewBeaconChainClient(conn), nil +} + +func NewGethNode() Service { + return newDockerService("execution-node", shared.GethRPC) +} + +func NewGethNode2() Service { + return newDockerService("execution-node-2", shared.GethRPC) +} + +func GetExecutionClient(ctx context.Context) (*ethclient.Client, error) { + client, err := ethclient.DialContext(ctx, shared.GethRPC) + if err != nil { + return nil, fmt.Errorf("%w: Failed to connect to the Ethereum client", err) + } + return client, nil +} + +type dockerService struct { + started chan struct{} + svcname string + statusURL string +} + +func (s *dockerService) Start(ctx context.Context) error { + if err := StartServices(s.svcname); err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, "GET", s.statusURL, nil) + if err != nil { + return err + } + // loop until the status request returns successfully + for { + if _, err := http.DefaultClient.Do(req); err == nil { + close(s.started) + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(1 * time.Second): + } + } +} + +func (s *dockerService) Stop(ctx context.Context) error { + return StopService(s.svcname) +} + +func (s *dockerService) Started() <-chan struct{} { + return s.started +} + +func ServiceReady(ctx context.Context, svc Service) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-svc.Started(): + return nil + } + } +} + +func newDockerService(svcname string, statusURL string) Service { + return &dockerService{ + started: make(chan struct{}), + svcname: svcname, + statusURL: statusURL, + } +} diff --git a/tests/fee-market/main.go b/tests/fee-market/main.go index 65c5f52..f2ef894 100644 --- a/tests/fee-market/main.go +++ b/tests/fee-market/main.go @@ -3,37 +3,26 @@ package main import ( "bytes" "context" - "errors" "fmt" - "io" "log" "math/big" "sort" - "strings" "sync" "time" "github.com/Inphi/eip4844-interop/shared" "github.com/Inphi/eip4844-interop/tests/ctrl" + "github.com/Inphi/eip4844-interop/tests/util" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/consensus/misc" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" "github.com/holiman/uint256" - "github.com/libp2p/go-libp2p" - libp2pcore "github.com/libp2p/go-libp2p-core" - "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" - ma "github.com/multiformats/go-multiaddr" "github.com/protolambda/ztyp/view" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" - "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder" - beaconchainsync "github.com/prysmaticlabs/prysm/v3/beacon-chain/sync" consensustypes "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" - ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1" + "github.com/prysmaticlabs/prysm/v3/proto/eth/service" ethpbv2 "github.com/prysmaticlabs/prysm/v3/proto/eth/v2" - ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" ) func GetBlob() types.Blobs { @@ -49,30 +38,40 @@ func main() { ctrl.InitE2ETest() ctrl.WaitForShardingFork() ctrl.WaitForEip4844ForkEpoch() + env := ctrl.GetEnv() ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20) defer cancel() + ethClient, err := ctrl.GetExecutionClient(ctx) + if err != nil { + log.Fatalf("unable to get execution client: %v", err) + } + beaconClient, err := ctrl.GetBeaconNodeClient(ctx) + if err != nil { + log.Fatalf("unable to get beacon client: %v", err) + } + blobsData := make([]types.Blobs, 20) for i := range blobsData { blobsData[i] = GetBlob() } // Retrieve the current slot to being our blobs search on the beacon chain - startSlot := GetHeadSlot(ctx) + startSlot := util.GetHeadSlot(ctx, beaconClient) // Send multiple transactions at the same time to induce non-zero excess_blobs - UploadBlobsAndCheckBlockHeader(ctx, blobsData) + UploadBlobsAndCheckBlockHeader(ctx, ethClient, blobsData) - WaitForNextSlot(ctx) - WaitForNextSlot(ctx) + util.WaitForNextSlots(ctx, beaconClient, 1) + util.WaitForNextSlots(ctx, beaconClient, 1) - blocks := FindBlocksWithBlobs(ctx, startSlot) + blocks := FindBlocksWithBlobs(ctx, beaconClient, startSlot) log.Printf("checking blob from beacon node") var downloadedData []byte for _, b := range blocks { - data := DownloadBlobs(ctx, b.Slot, 1, shared.BeaconMultiAddress) + data := util.DownloadBlobs(ctx, b.Slot, 1, shared.BeaconMultiAddress) downloadedData = append(downloadedData, data...) } @@ -83,11 +82,11 @@ func main() { } log.Printf("checking blob from beacon node follower") - time.Sleep(time.Second * 2 * time.Duration(ctrl.Env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync + time.Sleep(time.Second * 2 * time.Duration(env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync downloadedData = nil for _, b := range blocks { - data := DownloadBlobs(ctx, b.Slot, 1, shared.BeaconFollowerMultiAddress) + data := util.DownloadBlobs(ctx, b.Slot, 1, shared.BeaconFollowerMultiAddress) downloadedData = append(downloadedData, data...) } if !bytes.Equal(flatBlobs, downloadedData) { @@ -110,22 +109,7 @@ func FlattenBlobs(blobsData []types.Blobs) []byte { return out } -func WaitForNextSlot(ctx context.Context) { - if err := ctrl.WaitForSlot(ctx, GetHeadSlot(ctx).Add(1)); err != nil { - log.Fatalf("error waiting for next slot: %v", err) - } -} - -func GetHeadSlot(ctx context.Context) consensustypes.Slot { - req := ðpbv1.BlockRequest{BlockId: []byte("head")} - header, err := ctrl.Env.BeaconChainClient.GetBlockHeader(ctx, req) - if err != nil { - log.Fatalf("unable to get beacon chain head: %v", err) - } - return header.Data.Header.Message.Slot -} - -func UploadBlobsAndCheckBlockHeader(ctx context.Context, blobsData []types.Blobs) { +func UploadBlobsAndCheckBlockHeader(ctx context.Context, client *ethclient.Client, blobsData []types.Blobs) { chainId := big.NewInt(1) signer := types.NewDankSigner(chainId) @@ -134,7 +118,7 @@ func UploadBlobsAndCheckBlockHeader(ctx context.Context, blobsData []types.Blobs log.Fatalf("Failed to load private key: %v", err) } - nonce, err := ctrl.Env.EthClient.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey)) + nonce, err := client.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey)) if err != nil { log.Fatalf("Error getting nonce: %v", err) } @@ -179,14 +163,14 @@ func UploadBlobsAndCheckBlockHeader(ctx context.Context, blobsData []types.Blobs tx := tx go func() { defer wg.Done() - err := ctrl.Env.EthClient.SendTransaction(ctx, tx) + err := client.SendTransaction(ctx, tx) if err != nil { log.Fatalf("Error sending tx: %v", err) } log.Printf("Waiting for transaction (%v) to be included...", tx.Hash()) - receipt, err := shared.WaitForReceipt(ctx, ctrl.Env.EthClient, tx.Hash()) + receipt, err := shared.WaitForReceipt(ctx, client, tx.Hash()) if err != nil { log.Fatalf("Error waiting for transaction receipt %v: %v", tx.Hash(), err) } @@ -203,7 +187,7 @@ func UploadBlobsAndCheckBlockHeader(ctx context.Context, blobsData []types.Blobs blocknum := receipt.BlockNumber.Uint64() if _, ok := blockNumbers[blocknum]; !ok { blockHash := receipt.BlockHash.Hex() - block, err := ctrl.Env.EthClient.BlockByHash(ctx, common.HexToHash(blockHash)) + block, err := client.BlockByHash(ctx, common.HexToHash(blockHash)) if err != nil { log.Fatalf("Error getting block: %v", err) } @@ -220,7 +204,7 @@ func UploadBlobsAndCheckBlockHeader(ctx context.Context, blobsData []types.Blobs }) prevExcessDataGas := new(big.Int) - parentBlock, err := ctrl.Env.EthClient.BlockByHash(ctx, blocks[0].ParentHash()) + parentBlock, err := client.BlockByHash(ctx, blocks[0].ParentHash()) if err != nil { log.Fatalf("Error getting block: %v", err) } @@ -238,9 +222,9 @@ func UploadBlobsAndCheckBlockHeader(ctx context.Context, blobsData []types.Blobs } } -func FindBlocksWithBlobs(ctx context.Context, startSlot consensustypes.Slot) []*ethpbv2.BeaconBlockEip4844 { +func FindBlocksWithBlobs(ctx context.Context, client service.BeaconChainClient, startSlot consensustypes.Slot) []*ethpbv2.BeaconBlockEip4844 { slot := startSlot - endSlot := GetHeadSlot(ctx) + endSlot := util.GetHeadSlot(ctx, client) var blocks []*ethpbv2.BeaconBlockEip4844 for { @@ -250,7 +234,7 @@ func FindBlocksWithBlobs(ctx context.Context, startSlot consensustypes.Slot) []* blockID := fmt.Sprintf("%d", uint64(slot)) req := ðpbv2.BlockRequestV2{BlockId: []byte(blockID)} - block, err := ctrl.Env.BeaconChainClient.GetBlockV2(ctx, req) + block, err := client.GetBlockV2(ctx, req) if err != nil { log.Fatalf("beaconchainclient.GetBlock: %v", err) } @@ -269,150 +253,3 @@ func FindBlocksWithBlobs(ctx context.Context, startSlot consensustypes.Slot) []* } return blocks } - -func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, count uint64, beaconMA string) []byte { - // TODO: Use Beacon gRPC to download blobs rather than p2p RPC - log.Print("downloading blobs...") - - req := ðpb.BlobsSidecarsByRangeRequest{ - StartSlot: startSlot, - Count: count, - } - - h, err := libp2p.New() - if err != nil { - log.Fatalf("failed to create libp2p context: %v", err) - } - defer func() { - _ = h.Close() - }() - - multiaddr, err := getMultiaddr(ctx, h, beaconMA) - if err != nil { - log.Fatalf("getMultiAddr: %v", err) - } - - addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr) - if err != nil { - log.Fatal(err) - } - - err = h.Connect(ctx, *addrInfo) - if err != nil { - log.Fatalf("libp2p host connect: %v", err) - } - - // Hack to ensure that we are able to download blob chunks with larger chunk sizes (which is 10 MiB post-bellatrix) - encoder.MaxChunkSize = 10 << 20 - sidecars, err := sendBlobsSidecarsByRangeRequest(ctx, h, encoder.SszNetworkEncoder{}, addrInfo.ID, req) - if err != nil { - log.Fatalf("failed to send blobs p2p request: %v", err) - } - - anyBlobs := false - blobsBuffer := new(bytes.Buffer) - for _, sidecar := range sidecars { - log.Printf("found sidecar with %d blobs", len(sidecar.Blobs)) - if sidecar.Blobs == nil || len(sidecar.Blobs) == 0 { - continue - } - anyBlobs = true - for _, blob := range sidecar.Blobs { - data := shared.DecodeBlob(blob.Blob) - _, _ = blobsBuffer.Write(data) - } - - // stop after the first sidecar with blobs: - break - } - - if !anyBlobs { - log.Fatalf("No blobs found in requested slots, sidecar count: %d", len(sidecars)) - } - - return blobsBuffer.Bytes() -} - -func getMultiaddr(ctx context.Context, h host.Host, addr string) (ma.Multiaddr, error) { - multiaddr, err := ma.NewMultiaddr(addr) - if err != nil { - return nil, err - } - _, id := peer.SplitAddr(multiaddr) - if id != "" { - return multiaddr, nil - } - // peer ID wasn't provided, look it up - id, err = retrievePeerID(ctx, h, addr) - if err != nil { - return nil, err - } - return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, string(id))) -} - -// Helper for retrieving the peer ID from a security error... obviously don't use this in production! -// See https://github.com/libp2p/go-libp2p-noise/blob/v0.3.0/handshake.go#L250 -func retrievePeerID(ctx context.Context, h host.Host, addr string) (peer.ID, error) { - incorrectPeerID := "16Uiu2HAmSifdT5QutTsaET8xqjWAMPp4obrQv7LN79f2RMmBe3nY" - addrInfo, err := peer.AddrInfoFromString(fmt.Sprintf("%s/p2p/%s", addr, incorrectPeerID)) - if err != nil { - return "", err - } - err = h.Connect(ctx, *addrInfo) - if err == nil { - return "", errors.New("unexpected successful connection") - } - if strings.Contains(err.Error(), "but remote key matches") { - split := strings.Split(err.Error(), " ") - return peer.ID(split[len(split)-1]), nil - } - return "", err -} - -func sendBlobsSidecarsByRangeRequest(ctx context.Context, h host.Host, encoding encoder.NetworkEncoding, pid peer.ID, req *ethpb.BlobsSidecarsByRangeRequest) ([]*ethpb.BlobsSidecar, error) { - topic := fmt.Sprintf("%s%s", p2p.RPCBlobsSidecarsByRangeTopicV1, encoding.ProtocolSuffix()) - - stream, err := h.NewStream(ctx, pid, protocol.ID(topic)) - if err != nil { - return nil, err - } - defer func() { - _ = stream.Close() - }() - - if _, err := encoding.EncodeWithMaxLength(stream, req); err != nil { - _ = stream.Reset() - return nil, err - } - - if err := stream.CloseWrite(); err != nil { - _ = stream.Reset() - return nil, err - } - - var blobsSidecars []*ethpb.BlobsSidecar - for { - blobs, err := readChunkedBlobsSidecar(stream, encoding) - if errors.Is(err, io.EOF) { - break - } - if err != nil { - return nil, err - } - blobsSidecars = append(blobsSidecars, blobs) - } - return blobsSidecars, nil -} - -func readChunkedBlobsSidecar(stream libp2pcore.Stream, encoding encoder.NetworkEncoding) (*ethpb.BlobsSidecar, error) { - code, errMsg, err := beaconchainsync.ReadStatusCode(stream, encoding) - if err != nil { - return nil, err - } - if code != 0 { - return nil, errors.New(errMsg) - } - sidecar := new(ethpb.BlobsSidecar) - err = encoding.DecodeWithMaxLength(stream, sidecar) - return sidecar, err -} diff --git a/tests/initial-sync/main.go b/tests/initial-sync/main.go new file mode 100644 index 0000000..182aff6 --- /dev/null +++ b/tests/initial-sync/main.go @@ -0,0 +1,161 @@ +package main + +import ( + "context" + "log" + "math/big" + "time" + + "github.com/Inphi/eip4844-interop/shared" + "github.com/Inphi/eip4844-interop/tests/ctrl" + "github.com/Inphi/eip4844-interop/tests/util" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/holiman/uint256" + "github.com/protolambda/ztyp/view" + "golang.org/x/sync/errgroup" +) + +func GetBlobs() types.Blobs { + // dummy data for the test + return shared.EncodeBlobs([]byte("EKANS")) +} + +// Asserts blob syncing functionality during initial-sync +// 1. Start a single EL/CL node +// 2. Upload blobs +// 3. Wait for blobs to be available +// 4. Start follower EL/CL nodes +// 5. Download blobs from follower +// 6. Asserts that downloaded blobs match the upload +// 7. Asserts execution and beacon block attributes +func main() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20) + defer cancel() + + ctrl.StopDevnet() + env := ctrl.GetEnv() + + g, gctx := errgroup.WithContext(ctx) + g.Go(func() error { + return env.GethNode.Start(gctx) + }) + g.Go(func() error { + return env.BeaconNode.Start(gctx) + }) + g.Go(func() error { + return env.ValidatorNode.Start(gctx) + }) + if err := g.Wait(); err != nil { + log.Fatalf("failed to start services: %v", err) + } + ctrl.WaitForShardingFork() + ctrl.WaitForEip4844ForkEpoch() + + ethClient, err := ctrl.GetExecutionClient(ctx) + if err != nil { + log.Fatalf("unable to get execution client: %v", err) + } + beaconClient, err := ctrl.GetBeaconNodeClient(ctx) + if err != nil { + log.Fatalf("unable to get beacon client: %v", err) + } + + // Retrieve the current slot to being our blobs search on the beacon chain + startSlot := util.GetHeadSlot(ctx, beaconClient) + + blobs := GetBlobs() + UploadBlobs(ctx, ethClient, blobs) + util.WaitForNextSlots(ctx, beaconClient, 1) + blobSlot := util.FindBlobSlot(ctx, beaconClient, startSlot) + + // Wait a bit to induce substantial initial-sync in the beacon node follower + util.WaitForNextSlots(ctx, beaconClient, 10) + + g.Go(func() error { + return env.GethNode2.Start(ctx) + }) + g.Go(func() error { + return env.BeaconNodeFollower.Start(ctx) + }) + if err := g.Wait(); err != nil { + log.Fatalf("failed to start services: %v", err) + } + + beaconNodeFollowerClient, err := ctrl.GetBeaconNodeFollowerClient(ctx) + if err != nil { + log.Fatalf("failed to get beacon node follower client: %v", err) + } + + syncSlot := util.GetHeadSlot(ctx, beaconClient) + if err := ctrl.WaitForSlotWithClient(ctx, beaconNodeFollowerClient, syncSlot); err != nil { + log.Fatalf("unable to wait for beacon follower sync: %v", err) + } + + log.Printf("checking blob from beacon node") + downloadedData := util.DownloadBlobs(ctx, blobSlot, 1, shared.BeaconMultiAddress) + downloadedBlobs := shared.EncodeBlobs(downloadedData) + util.AssertBlobsEquals(blobs, downloadedBlobs) + + log.Printf("checking blob from beacon node follower") + time.Sleep(time.Second * 2 * time.Duration(env.BeaconChainConfig.SecondsPerSlot)) // wait a bit for sync + downloadedData = util.DownloadBlobs(ctx, blobSlot, 1, shared.BeaconFollowerMultiAddress) + downloadedBlobs = shared.EncodeBlobs(downloadedData) + util.AssertBlobsEquals(blobs, downloadedBlobs) +} + +func UploadBlobs(ctx context.Context, client *ethclient.Client, blobs types.Blobs) { + chainId := big.NewInt(1) + signer := types.NewDankSigner(chainId) + + key, err := crypto.HexToECDSA(shared.PrivateKey) + if err != nil { + log.Fatalf("Failed to load private key: %v", err) + } + + nonce, err := client.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey)) + if err != nil { + log.Fatalf("Error getting nonce: %v", err) + } + log.Printf("Nonce: %d", nonce) + + commitments, versionedHashes, aggregatedProof, err := blobs.ComputeCommitmentsAndAggregatedProof() + + to := common.HexToAddress("ffb38a7a99e3e2335be83fc74b7faa19d5531243") + txData := types.SignedBlobTx{ + Message: types.BlobTxMessage{ + ChainID: view.Uint256View(*uint256.NewInt(chainId.Uint64())), + Nonce: view.Uint64View(nonce), + Gas: 210000, + GasFeeCap: view.Uint256View(*uint256.NewInt(5000000000)), + GasTipCap: view.Uint256View(*uint256.NewInt(5000000000)), + MaxFeePerDataGas: view.Uint256View(*uint256.NewInt(3000000000)), // needs to be at least the min fee + Value: view.Uint256View(*uint256.NewInt(12345678)), + To: types.AddressOptionalSSZ{Address: (*types.AddressSSZ)(&to)}, + BlobVersionedHashes: versionedHashes, + }, + } + + wrapData := types.BlobTxWrapData{ + BlobKzgs: commitments, + Blobs: blobs, + KzgAggregatedProof: aggregatedProof, + } + tx := types.NewTx(&txData, types.WithTxWrapData(&wrapData)) + tx, err = types.SignTx(tx, signer, key) + if err != nil { + log.Fatalf("Error signing tx: %v", err) + } + err = client.SendTransaction(ctx, tx) + if err != nil { + log.Fatalf("Error sending tx: %v", err) + } + log.Printf("Transaction submitted. hash=%v", tx.Hash()) + + log.Printf("Waiting for transaction (%v) to be included...", tx.Hash()) + if _, err := shared.WaitForReceipt(ctx, client, tx.Hash()); err != nil { + log.Fatalf("Error waiting for transaction receipt %v: %v", tx.Hash(), err) + } +} diff --git a/tests/pre-4844/main.go b/tests/pre-4844/main.go index 1e05429..b02ea60 100644 --- a/tests/pre-4844/main.go +++ b/tests/pre-4844/main.go @@ -18,6 +18,7 @@ import ( // Asserts that transaction still work before the 4844 fork in execution func main() { ctrl.InitE2ETest() + env := ctrl.GetEnv() chainId := big.NewInt(1) signer := types.NewDankSigner(chainId) @@ -25,12 +26,16 @@ func main() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*20) defer cancel() + client, err := ctrl.GetExecutionClient(ctx) + if err != nil { + log.Fatalf("unable to get execution client: %v", err) + } + key, err := crypto.HexToECDSA(shared.PrivateKey) if err != nil { log.Fatalf("Failed to load private key: %v", err) } - client := ctrl.Env.EthClient nonce, err := client.PendingNonceAt(ctx, crypto.PubkeyToAddress(key.PublicKey)) if err != nil { log.Fatalf("Error getting nonce: %v", err) @@ -79,7 +84,7 @@ func main() { log.Fatalf("Error getting block: %v", err) } - eip4844Block := ctrl.Env.GethChainConfig.ShardingForkBlock.Uint64() + eip4844Block := env.GethChainConfig.ShardingForkBlock.Uint64() if receipt.BlockNumber.Uint64() > eip4844Block { // TODO: Avoid this issue by configuring the chain config at runtime log.Fatalf("Test condition violation. Transaction must be included before eip4844 fork. Check the geth chain config") diff --git a/tests/util/util.go b/tests/util/util.go new file mode 100644 index 0000000..1f0d7a2 --- /dev/null +++ b/tests/util/util.go @@ -0,0 +1,249 @@ +package util + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "log" + "strings" + "time" + + "github.com/Inphi/eip4844-interop/shared" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/params" + "github.com/libp2p/go-libp2p" + libp2pcore "github.com/libp2p/go-libp2p-core" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + ma "github.com/multiformats/go-multiaddr" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/p2p/encoder" + "github.com/prysmaticlabs/prysm/v3/beacon-chain/sync" + consensustypes "github.com/prysmaticlabs/prysm/v3/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v3/proto/eth/service" + ethpbv1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1" + ethpbv2 "github.com/prysmaticlabs/prysm/v3/proto/eth/v2" + ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1" +) + +func init() { + encoder.MaxChunkSize = 10 << 20 +} + +func WaitForSlot(ctx context.Context, client service.BeaconChainClient, slot consensustypes.Slot) error { + req := ðpbv1.BlockRequest{BlockId: []byte("head")} + for { + header, err := client.GetBlockHeader(ctx, req) + if err != nil { + return fmt.Errorf("unable to retrieve block header: %v", err) + } + headSlot := header.Data.Header.Message.Slot + if headSlot >= slot { + break + } + time.Sleep(time.Second * 1) + } + return nil +} + +func WaitForNextSlots(ctx context.Context, client service.BeaconChainClient, slots consensustypes.Slot) { + if err := WaitForSlot(ctx, client, GetHeadSlot(ctx, client).AddSlot(slots)); err != nil { + log.Fatalf("error waiting for next slot: %v", err) + } +} + +func GetHeadSlot(ctx context.Context, client service.BeaconChainClient) consensustypes.Slot { + req := ðpbv1.BlockRequest{BlockId: []byte("head")} + header, err := client.GetBlockHeader(ctx, req) + if err != nil { + log.Fatalf("unable to get beacon chain head: %v", err) + } + return header.Data.Header.Message.Slot +} + +// FindBlobSlot returns the first slot containing a blob since startSlot +// Panics if no such slot could be found +func FindBlobSlot(ctx context.Context, client service.BeaconChainClient, startSlot consensustypes.Slot) consensustypes.Slot { + slot := startSlot + endSlot := GetHeadSlot(ctx, client) + for { + if slot == endSlot { + log.Fatalf("Unable to find beacon block containing blobs") + } + + blockID := fmt.Sprintf("%d", uint64(slot)) + req := ðpbv2.BlockRequestV2{BlockId: []byte(blockID)} + block, err := client.GetBlockV2(ctx, req) + if err != nil { + log.Fatalf("beaconchainclient.GetBlock: %v", err) + } + eip4844, ok := block.Data.Message.(*ethpbv2.SignedBeaconBlockContainer_Eip4844Block) + if ok { + if len(eip4844.Eip4844Block.Body.BlobKzgs) != 0 { + return eip4844.Eip4844Block.Slot + } + } + + slot = slot.Add(1) + } +} + +func AssertBlobsEquals(a, b types.Blobs) { + if len(a) != len(b) { + log.Fatalf("data length mismatch (%d != %d)", len(a), len(b)) + } + for i, _ := range a { + for j := 0; j < params.FieldElementsPerBlob; j++ { + if !bytes.Equal(a[i][j][:], b[i][j][:]) { + log.Fatal("blobs data mismatch") + } + } + } +} + +func SendBlobsSidecarsByRangeRequest(ctx context.Context, h host.Host, encoding encoder.NetworkEncoding, pid peer.ID, req *ethpb.BlobsSidecarsByRangeRequest) ([]*ethpb.BlobsSidecar, error) { + topic := fmt.Sprintf("%s%s", p2p.RPCBlobsSidecarsByRangeTopicV1, encoding.ProtocolSuffix()) + + stream, err := h.NewStream(ctx, pid, protocol.ID(topic)) + if err != nil { + return nil, err + } + defer func() { + _ = stream.Close() + }() + + if _, err := encoding.EncodeWithMaxLength(stream, req); err != nil { + _ = stream.Reset() + return nil, err + } + + if err := stream.CloseWrite(); err != nil { + _ = stream.Reset() + return nil, err + } + + var blobsSidecars []*ethpb.BlobsSidecar + for { + blobs, err := readChunkedBlobsSidecar(stream, encoding) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, err + } + blobsSidecars = append(blobsSidecars, blobs) + } + return blobsSidecars, nil +} + +func readChunkedBlobsSidecar(stream libp2pcore.Stream, encoding encoder.NetworkEncoding) (*ethpb.BlobsSidecar, error) { + code, errMsg, err := sync.ReadStatusCode(stream, encoding) + if err != nil { + return nil, err + } + if code != 0 { + return nil, errors.New(errMsg) + } + sidecar := new(ethpb.BlobsSidecar) + err = encoding.DecodeWithMaxLength(stream, sidecar) + return sidecar, err +} + +func DownloadBlobs(ctx context.Context, startSlot consensustypes.Slot, count uint64, beaconMA string) []byte { + // TODO: Use Beacon gRPC to download blobs rather than p2p RPC + log.Print("downloading blobs...") + + req := ðpb.BlobsSidecarsByRangeRequest{ + StartSlot: startSlot, + Count: count, + } + + h, err := libp2p.New() + if err != nil { + log.Fatalf("failed to create libp2p context: %v", err) + } + defer func() { + _ = h.Close() + }() + + multiaddr, err := getMultiaddr(ctx, h, beaconMA) + if err != nil { + log.Fatalf("getMultiAddr: %v", err) + } + + addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr) + if err != nil { + log.Fatal(err) + } + + err = h.Connect(ctx, *addrInfo) + if err != nil { + log.Fatalf("libp2p host connect: %v", err) + } + + sidecars, err := SendBlobsSidecarsByRangeRequest(ctx, h, encoder.SszNetworkEncoder{}, addrInfo.ID, req) + if err != nil { + log.Fatalf("failed to send blobs p2p request: %v", err) + } + + anyBlobs := false + blobsBuffer := new(bytes.Buffer) + for _, sidecar := range sidecars { + if sidecar.Blobs == nil || len(sidecar.Blobs) == 0 { + continue + } + anyBlobs = true + for _, blob := range sidecar.Blobs { + data := shared.DecodeBlob(blob.Blob) + _, _ = blobsBuffer.Write(data) + } + + // stop after the first sidecar with blobs: + break + } + + if !anyBlobs { + log.Fatalf("No blobs found in requested slots, sidecar count: %d", len(sidecars)) + } + + return blobsBuffer.Bytes() +} + +func getMultiaddr(ctx context.Context, h host.Host, addr string) (ma.Multiaddr, error) { + multiaddr, err := ma.NewMultiaddr(addr) + if err != nil { + return nil, err + } + _, id := peer.SplitAddr(multiaddr) + if id != "" { + return multiaddr, nil + } + // peer ID wasn't provided, look it up + id, err = retrievePeerID(ctx, h, addr) + if err != nil { + return nil, err + } + return ma.NewMultiaddr(fmt.Sprintf("%s/p2p/%s", addr, string(id))) +} + +// Helper for retrieving the peer ID from a security error... obviously don't use this in production! +// See https://github.com/libp2p/go-libp2p-noise/blob/v0.3.0/handshake.go#L250 +func retrievePeerID(ctx context.Context, h host.Host, addr string) (peer.ID, error) { + incorrectPeerID := "16Uiu2HAmSifdT5QutTsaET8xqjWAMPp4obrQv7LN79f2RMmBe3nY" + addrInfo, err := peer.AddrInfoFromString(fmt.Sprintf("%s/p2p/%s", addr, incorrectPeerID)) + if err != nil { + return "", err + } + err = h.Connect(ctx, *addrInfo) + if err == nil { + return "", errors.New("unexpected successful connection") + } + if strings.Contains(err.Error(), "but remote key matches") { + split := strings.Split(err.Error(), " ") + return peer.ID(split[len(split)-1]), nil + } + return "", err +}