From 5780949589abd65916cd0b361f574fd5f42853c6 Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 17 Nov 2025 12:00:06 -0800 Subject: [PATCH 1/2] Update to flow-go ab0655178589 --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 68e565a8..637efc0f 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/onflow/cadence v1.8.3 github.com/onflow/crypto v0.25.3 github.com/onflow/flow-core-contracts/lib/go/templates v1.9.2 - github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251114171948-8b69d4ce50bb + github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251117184523-ab0655178589 github.com/onflow/flow-go-sdk v1.9.2 github.com/onflow/flow-nft/lib/go/contracts v1.3.0 github.com/onflow/flow/protobuf/go/flow v0.4.18 diff --git a/go.sum b/go.sum index 9ec37ee4..a4bf428f 100644 --- a/go.sum +++ b/go.sum @@ -768,8 +768,8 @@ github.com/onflow/flow-ft/lib/go/contracts v1.0.1 h1:Ts5ob+CoCY2EjEd0W6vdLJ7hLL3 github.com/onflow/flow-ft/lib/go/contracts v1.0.1/go.mod h1:PwsL8fC81cjnUnTfmyL/HOIyHnyaw/JA474Wfj2tl6A= github.com/onflow/flow-ft/lib/go/templates v1.0.1 h1:FDYKAiGowABtoMNusLuRCILIZDtVqJ/5tYI4VkF5zfM= github.com/onflow/flow-ft/lib/go/templates v1.0.1/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE= -github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251114171948-8b69d4ce50bb h1:pdoWUpQNdkdLUY8CzWdlV7zgEKX6c93i+TEQk0dLu7Y= -github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251114171948-8b69d4ce50bb/go.mod h1:4A34mlMMd7usjw7e0r4VrltFpZRjhYkV+T+iecTbzO0= +github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251117184523-ab0655178589 h1:EqsUfh8MW4vOdWfXc1D8kKr4VBhRKsqeNCBeEWbw5Ew= +github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251117184523-ab0655178589/go.mod h1:4A34mlMMd7usjw7e0r4VrltFpZRjhYkV+T+iecTbzO0= github.com/onflow/flow-go-sdk v1.9.2 h1:kMw3qShgLNIASHGMgoY+faTBQ+1MnzsNLAH+oxy9eiY= github.com/onflow/flow-go-sdk v1.9.2/go.mod h1:qVuzMGXNJBMktKnIDKLjV0/k21P2XD39dOfMW+X5Bsc= github.com/onflow/flow-nft/lib/go/contracts v1.3.0 h1:DmNop+O0EMyicZvhgdWboFG57xz5t9Qp81FKlfKyqJc= From 0d04d75ce73102fc9346ea1088c1b1b5f980141d Mon Sep 17 00:00:00 2001 From: Jordan Ribbink Date: Mon, 17 Nov 2025 12:21:01 -0800 Subject: [PATCH 2/2] Use built-in GRPC retry --- server/fork_integration_test.go | 19 +++- server/server.go | 20 +++-- storage/remote/store.go | 155 +++----------------------------- utils/grpc.go | 44 +++++++++ 4 files changed, 85 insertions(+), 153 deletions(-) create mode 100644 utils/grpc.go diff --git a/server/fork_integration_test.go b/server/fork_integration_test.go index 53ede4fc..9ce86b0a 100644 --- a/server/fork_integration_test.go +++ b/server/fork_integration_test.go @@ -24,6 +24,7 @@ import ( "github.com/onflow/cadence" "github.com/onflow/flow-emulator/convert" + "github.com/onflow/flow-emulator/utils" flowsdk "github.com/onflow/flow-go-sdk" flowgo "github.com/onflow/flow-go/model/flow" flowaccess "github.com/onflow/flow/protobuf/go/flow/access" @@ -36,13 +37,18 @@ import ( func TestForkingAgainstTestnet(t *testing.T) { logger := zerolog.Nop() - // Get remote latest sealed height to pin fork - conn, err := grpc.NewClient("access.testnet.nodes.onflow.org:9000", grpc.WithTransportCredentials(insecure.NewCredentials())) + // Get remote latest sealed height to pin fork with automatic retry + conn, err := grpc.NewClient( + "access.testnet.nodes.onflow.org:9000", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig), + ) if err != nil { t.Fatalf("dial remote: %v", err) } defer func() { _ = conn.Close() }() remote := flowaccess.NewAccessAPIClient(conn) + rh, err := remote.GetLatestBlockHeader(context.Background(), &flowaccess.GetLatestBlockHeaderRequest{IsSealed: true}) if err != nil { t.Fatalf("get remote header: %v", err) @@ -167,13 +173,18 @@ func TestForkingAgainstTestnet(t *testing.T) { func TestForkingAgainstMainnet(t *testing.T) { logger := zerolog.Nop() - // Get remote latest sealed height to pin fork - conn, err := grpc.NewClient("access.mainnet.nodes.onflow.org:9000", grpc.WithTransportCredentials(insecure.NewCredentials())) + // Get remote latest sealed height to pin fork with automatic retry + conn, err := grpc.NewClient( + "access.mainnet.nodes.onflow.org:9000", + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig), + ) if err != nil { t.Fatalf("dial remote: %v", err) } defer func() { _ = conn.Close() }() remote := flowaccess.NewAccessAPIClient(conn) + rh, err := remote.GetLatestBlockHeader(context.Background(), &flowaccess.GetLatestBlockHeaderRequest{IsSealed: true}) if err != nil { t.Fatalf("get remote header: %v", err) diff --git a/server/server.go b/server/server.go index 8a578101..6e2ff6dd 100644 --- a/server/server.go +++ b/server/server.go @@ -44,12 +44,13 @@ import ( "github.com/onflow/flow-emulator/emulator" "github.com/onflow/flow-emulator/server/access" "github.com/onflow/flow-emulator/server/debugger" - "github.com/onflow/flow-emulator/server/utils" + serverutils "github.com/onflow/flow-emulator/server/utils" "github.com/onflow/flow-emulator/storage" "github.com/onflow/flow-emulator/storage/checkpoint" "github.com/onflow/flow-emulator/storage/remote" "github.com/onflow/flow-emulator/storage/sqlite" "github.com/onflow/flow-emulator/storage/util" + "github.com/onflow/flow-emulator/utils" ) // EmulatorServer is a local server that runs a Flow Emulator instance. @@ -65,7 +66,7 @@ type EmulatorServer struct { storage graceland.Routine grpc *access.GRPCServer rest *access.RestServer - admin *utils.HTTPServer + admin *serverutils.HTTPServer blocks graceland.Routine debugger graceland.Routine } @@ -79,7 +80,7 @@ const ( defaultDBGCRatio = 0.5 ) -var defaultHTTPHeaders = []utils.HTTPHeader{ +var defaultHTTPHeaders = []serverutils.HTTPHeader{ { Key: "Access-Control-Allow-Origin", Value: "*", @@ -102,7 +103,7 @@ type Config struct { DebuggerPort int RESTPort int RESTDebug bool - HTTPHeaders []utils.HTTPHeader + HTTPHeaders []serverutils.HTTPHeader BlockTime time.Duration ServicePublicKey crypto.PublicKey ServicePrivateKey crypto.PrivateKey @@ -243,7 +244,7 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer { } accessAdapter := adapters.NewAccessAdapter(logger, emulatedBlockchain) - livenessTicker := utils.NewLivenessTicker(conf.LivenessCheckTolerance) + livenessTicker := serverutils.NewLivenessTicker(conf.LivenessCheckTolerance) grpcServer := access.NewGRPCServer(logger, emulatedBlockchain, accessAdapter, chain, conf.Host, conf.GRPCPort, conf.GRPCDebug) restServer, err := access.NewRestServer(logger, emulatedBlockchain, accessAdapter, chain, conf.Host, conf.RESTPort, conf.RESTDebug) if err != nil { @@ -264,7 +265,7 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer { debugger: debugger.New(logger, emulatedBlockchain, conf.DebuggerPort), } - server.admin = utils.NewAdminServer(logger, emulatedBlockchain, accessAdapter, grpcServer, livenessTicker, conf.Host, conf.AdminPort, conf.HTTPHeaders) + server.admin = serverutils.NewAdminServer(logger, emulatedBlockchain, accessAdapter, grpcServer, livenessTicker, conf.Host, conf.AdminPort, conf.HTTPHeaders) // only create blocks ticker if block time > 0 if conf.BlockTime > 0 { @@ -280,12 +281,17 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer { // detectRemoteChainID connects to the remote access node and fetches network parameters to obtain the chain ID. func DetectRemoteChainID(url string) (flowgo.ChainID, error) { // Expect raw host:port - conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.NewClient( + url, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig), + ) if err != nil { return "", err } defer func() { _ = conn.Close() }() client := flowaccess.NewAccessAPIClient(conn) + resp, err := client.GetNetworkParameters(context.Background(), &flowaccess.GetNetworkParametersRequest{}) if err != nil { return "", err diff --git a/storage/remote/store.go b/storage/remote/store.go index 167da09e..275334f9 100644 --- a/storage/remote/store.go +++ b/storage/remote/store.go @@ -21,9 +21,6 @@ package remote import ( "context" "fmt" - "math" - "math/rand" - "time" lru "github.com/hashicorp/golang-lru/v2" "github.com/onflow/flow-go/engine/common/rpc/convert" @@ -42,114 +39,14 @@ import ( "github.com/onflow/flow-emulator/storage" "github.com/onflow/flow-emulator/storage/sqlite" "github.com/onflow/flow-emulator/types" + "github.com/onflow/flow-emulator/utils" ) -// Retry and concurrency configuration +// Configuration const ( - maxRetries = 5 - baseDelay = 100 * time.Millisecond - maxDelay = 30 * time.Second - jitterFactor = 0.1 - maxConcurrentRequests = 10 // Maximum concurrent requests to remote node - blockBuffer = 10 // Buffer to allow for block propagation + blockBuffer = 10 // Buffer to allow for block propagation ) -// isRateLimitError checks if the error is a rate limiting error -func isRateLimitError(err error) bool { - if err == nil { - return false - } - - st, ok := status.FromError(err) - if !ok { - return false - } - - // ResourceExhausted is the standard gRPC code for rate limiting - return st.Code() == codes.ResourceExhausted -} - -// exponentialBackoffWithJitter calculates delay with exponential backoff and jitter -func exponentialBackoffWithJitter(attempt int) time.Duration { - if attempt <= 0 { - return baseDelay - } - - // Calculate exponential delay: baseDelay * 2^(attempt-1) - delay := float64(baseDelay) * math.Pow(2, float64(attempt-1)) - - // Cap at maxDelay - if delay > float64(maxDelay) { - delay = float64(maxDelay) - } - - // Add jitter: ±10% random variation - jitter := delay * jitterFactor * (2*rand.Float64() - 1) - delay += jitter - - // Ensure minimum delay - if delay < float64(baseDelay) { - delay = float64(baseDelay) - } - - return time.Duration(delay) -} - -// retryWithBackoff executes a function with exponential backoff retry on rate limit errors -func (s *Store) retryWithBackoff(ctx context.Context, operation string, fn func() error) error { - // Acquire semaphore to limit concurrent requests - select { - case s.concurrencySem <- struct{}{}: - defer func() { <-s.concurrencySem }() - case <-ctx.Done(): - return ctx.Err() - } - - var lastErr error - - for attempt := 0; attempt <= maxRetries; attempt++ { - err := fn() - if err == nil { - return nil - } - - lastErr = err - - // Only retry on recognized, transient rate limit errors - if !isRateLimitError(err) { - return err - } - - // Continue with retry logic for rate limits only - if attempt == maxRetries { - s.logger.Warn(). - Str("operation", operation). - Int("attempt", attempt+1). - Err(err). - Msg("Request failed after max attempts") - return err - } - - // Calculate delay and wait - delay := exponentialBackoffWithJitter(attempt) - s.logger.Debug(). - Str("operation", operation). - Int("attempt", attempt+1). - Dur("delay", delay). - Err(err). - Msg("Rate limited, retrying with backoff") - - select { - case <-ctx.Done(): - return ctx.Err() - case <-time.After(delay): - // Continue to next attempt - } - } - - return lastErr -} - type Store struct { *sqlite.Store executionClient executiondata.ExecutionDataAPIClient @@ -159,7 +56,6 @@ type Store struct { chainID flowgo.ChainID forkHeight uint64 logger *zerolog.Logger - concurrencySem chan struct{} // Semaphore to limit concurrent requests } type Option func(*Store) @@ -207,9 +103,8 @@ func WithClient( func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*Store, error) { store := &Store{ - Store: provider, - logger: logger, - concurrencySem: make(chan struct{}, maxConcurrentRequests), + Store: provider, + logger: logger, } for _, opt := range options { @@ -225,6 +120,7 @@ func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*St store.host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)), + grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig), ) if err != nil { return nil, fmt.Errorf("could not connect to rpc host: %w", err) @@ -235,12 +131,7 @@ func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*St store.accessClient = access.NewAccessAPIClient(conn) } - var params *access.GetNetworkParametersResponse - err := store.retryWithBackoff(context.Background(), "GetNetworkParameters", func() error { - var err error - params, err = store.accessClient.GetNetworkParameters(context.Background(), &access.GetNetworkParametersRequest{}) - return err - }) + params, err := store.accessClient.GetNetworkParameters(context.Background(), &access.GetNetworkParametersRequest{}) if err != nil { return nil, fmt.Errorf("could not get network parameters: %w", err) } @@ -282,12 +173,7 @@ func (s *Store) initializeStartBlock(ctx context.Context) error { // use the current latest block from the rpc host if no height was provided if s.forkHeight == 0 { - var resp *access.BlockHeaderResponse - err := s.retryWithBackoff(ctx, "GetLatestBlockHeader", func() error { - var err error - resp, err = s.accessClient.GetLatestBlockHeader(ctx, &access.GetLatestBlockHeaderRequest{IsSealed: true}) - return err - }) + resp, err := s.accessClient.GetLatestBlockHeader(ctx, &access.GetLatestBlockHeaderRequest{IsSealed: true}) if err != nil { return fmt.Errorf("could not get last block height: %w", err) } @@ -322,12 +208,7 @@ func (s *Store) BlockByID(ctx context.Context, blockID flowgo.Identifier) (*flow if err == nil { height = block.Height } else if errors.Is(err, storage.ErrNotFound) { - var heightRes *access.BlockHeaderResponse - err := s.retryWithBackoff(ctx, "GetBlockHeaderByID", func() error { - var err error - heightRes, err = s.accessClient.GetBlockHeaderByID(ctx, &access.GetBlockHeaderByIDRequest{Id: blockID[:]}) - return err - }) + heightRes, err := s.accessClient.GetBlockHeaderByID(ctx, &access.GetBlockHeaderByIDRequest{Id: blockID[:]}) if err != nil { return nil, err } @@ -367,12 +248,7 @@ func (s *Store) BlockByHeight(ctx context.Context, height uint64) (*flowgo.Block return nil, &types.BlockNotFoundByHeightError{Height: height} } - var blockRes *access.BlockHeaderResponse - err = s.retryWithBackoff(ctx, "GetBlockHeaderByHeight", func() error { - var err error - blockRes, err = s.accessClient.GetBlockHeaderByHeight(ctx, &access.GetBlockHeaderByHeightRequest{Height: height}) - return err - }) + blockRes, err := s.accessClient.GetBlockHeaderByHeight(ctx, &access.GetBlockHeaderByHeightRequest{Height: height}) if err != nil { return nil, err } @@ -429,14 +305,9 @@ func (s *Store) LedgerByHeight( } registerID := convert.RegisterIDToMessage(flowgo.RegisterID{Key: id.Key, Owner: id.Owner}) - var response *executiondata.GetRegisterValuesResponse - - err = s.retryWithBackoff(ctx, "GetRegisterValues", func() error { - response, err = s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{ - BlockHeight: lookupHeight, - RegisterIds: []*entities.RegisterID{registerID}, - }) - return err + response, err := s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{ + BlockHeight: lookupHeight, + RegisterIds: []*entities.RegisterID{registerID}, }) if err != nil { diff --git a/utils/grpc.go b/utils/grpc.go new file mode 100644 index 00000000..3792ba55 --- /dev/null +++ b/utils/grpc.go @@ -0,0 +1,44 @@ +/* + * Flow Emulator + * + * Copyright Flow Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +// DefaultGRPCServiceConfig provides automatic retry configuration for transient gRPC errors. +// This config is applied to all remote gRPC connections to handle network flakiness in CI +// and other environments. +// +// Retries on: +// - UNAVAILABLE: Service temporarily unavailable (e.g., node restarting) +// - RESOURCE_EXHAUSTED: Rate limiting from remote node +// - UNKNOWN: Connection failures, DNS issues, and other network errors +// +// Note: We only retry on clearly transient network/availability errors. +// We do NOT retry on INTERNAL (programming errors), ABORTED (conflicts), +// or DEADLINE_EXCEEDED (to avoid cascading failures on slow services). +const DefaultGRPCServiceConfig = `{ + "methodConfig": [{ + "name": [{"service": ""}], + "retryPolicy": { + "maxAttempts": 5, + "initialBackoff": "0.1s", + "maxBackoff": "30s", + "backoffMultiplier": 2, + "retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED", "UNKNOWN"] + } + }] +}`