diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 811667f8..368c8faa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,8 +23,6 @@ jobs: test: name: "Test" runs-on: ubuntu-latest - env: - GRPC_GO_RETRY: "on" steps: - uses: actions/checkout@v5 with: diff --git a/server/fork_integration_test.go b/server/fork_integration_test.go index 9ce86b0a..aa185197 100644 --- a/server/fork_integration_test.go +++ b/server/fork_integration_test.go @@ -41,7 +41,7 @@ func TestForkingAgainstTestnet(t *testing.T) { conn, err := grpc.NewClient( "access.testnet.nodes.onflow.org:9000", grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig), + utils.DefaultGRPCRetryInterceptor(), ) if err != nil { t.Fatalf("dial remote: %v", err) @@ -53,7 +53,7 @@ func TestForkingAgainstTestnet(t *testing.T) { if err != nil { t.Fatalf("get remote header: %v", err) } - remoteHeight := rh.Block.Height + remoteHeight := rh.Block.Height - 10 // Use a buffer to avoid edge cases cfg := &Config{ // Do not start listeners; NewEmulatorServer only configures components. @@ -177,7 +177,7 @@ func TestForkingAgainstMainnet(t *testing.T) { conn, err := grpc.NewClient( "access.mainnet.nodes.onflow.org:9000", grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig), + utils.DefaultGRPCRetryInterceptor(), ) if err != nil { t.Fatalf("dial remote: %v", err) @@ -189,7 +189,7 @@ func TestForkingAgainstMainnet(t *testing.T) { if err != nil { t.Fatalf("get remote header: %v", err) } - remoteHeight := rh.Block.Height + remoteHeight := rh.Block.Height - 10 // Use a buffer to avoid edge cases cfg := &Config{ // Do not start listeners; NewEmulatorServer only configures components. diff --git a/server/server.go b/server/server.go index 10d33e27..240e1f87 100644 --- a/server/server.go +++ b/server/server.go @@ -284,10 +284,10 @@ func DetectRemoteChainID(url string) (flowgo.ChainID, error) { conn, err := grpc.NewClient( url, grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig), + utils.DefaultGRPCRetryInterceptor(), ) if err != nil { - return "",fmt.Errorf("could not connect to remote access node: %w", err) + return "", err } defer func() { _ = conn.Close() }() client := flowaccess.NewAccessAPIClient(conn) diff --git a/storage/remote/store.go b/storage/remote/store.go index 275334f9..c2c26b2b 100644 --- a/storage/remote/store.go +++ b/storage/remote/store.go @@ -120,7 +120,7 @@ func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*St store.host, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)), - grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig), + utils.DefaultGRPCRetryInterceptor(), ) if err != nil { return nil, fmt.Errorf("could not connect to rpc host: %w", err) diff --git a/utils/grpc.go b/utils/grpc.go index 1a4675e5..53329dd9 100644 --- a/utils/grpc.go +++ b/utils/grpc.go @@ -18,18 +18,77 @@ package utils -// DefaultGRPCServiceConfig configures native gRPC retries for transient failures. -// The empty object wildcard [{}] matches all services and methods. -const DefaultGRPCServiceConfig = `{ - "methodConfig": [{ - "name": [{}], - "retryPolicy": { - "maxAttempts": 8, - "initialBackoff": "1s", - "maxBackoff": "30s", - "backoffMultiplier": 2, - "retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED", "UNKNOWN"] +import ( + "context" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + defaultMaxAttempts = 10 + defaultInitialBackoff = 1 * time.Second + defaultMaxBackoff = 30 * time.Second + defaultBackoffFactor = 2.0 +) + +// DefaultGRPCRetryInterceptor returns a unary client interceptor that retries +// transient failures with exponential backoff. Unlike native gRPC retries, this +// ignores server pushback headers (grpc-retry-pushback-ms) so rate-limited calls +// will retry client-side rather than fail immediately. +func DefaultGRPCRetryInterceptor() grpc.DialOption { + return grpc.WithChainUnaryInterceptor(retryInterceptor) +} + +func retryInterceptor( + ctx context.Context, + method string, + req, reply any, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, +) error { + var lastErr error + backoff := defaultInitialBackoff + + for attempt := 0; attempt < defaultMaxAttempts; attempt++ { + if attempt > 0 { + // Wait before retry + select { + case <-time.After(backoff): + case <-ctx.Done(): + return ctx.Err() + } + // Exponential backoff with cap + backoff = time.Duration(float64(backoff) * defaultBackoffFactor) + if backoff > defaultMaxBackoff { + backoff = defaultMaxBackoff + } } - }] -}` + + lastErr = invoker(ctx, method, req, reply, cc, opts...) + if lastErr == nil { + return nil + } + + // Check if error is retryable + code := status.Code(lastErr) + if !isRetryableCode(code) { + return lastErr + } + } + + return lastErr +} + +func isRetryableCode(code codes.Code) bool { + switch code { + case codes.Unavailable, codes.ResourceExhausted, codes.Unknown: + return true + default: + return false + } +}