Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ jobs:
test:
name: "Test"
runs-on: ubuntu-latest
env:
GRPC_GO_RETRY: "on"
steps:
- uses: actions/checkout@v5
with:
Expand Down
4 changes: 2 additions & 2 deletions server/fork_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestForkingAgainstTestnet(t *testing.T) {
conn, err := grpc.NewClient(
"access.testnet.nodes.onflow.org:9000",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig),
utils.DefaultGRPCRetryInterceptor(),
)
if err != nil {
t.Fatalf("dial remote: %v", err)
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestForkingAgainstMainnet(t *testing.T) {
conn, err := grpc.NewClient(
"access.mainnet.nodes.onflow.org:9000",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig),
utils.DefaultGRPCRetryInterceptor(),
)
if err != nil {
t.Fatalf("dial remote: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,10 @@ func DetectRemoteChainID(url string) (flowgo.ChainID, error) {
conn, err := grpc.NewClient(
url,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig),
utils.DefaultGRPCRetryInterceptor(),
)
if err != nil {
return "",fmt.Errorf("could not connect to remote access node: %w", err)
return "", err
}
defer func() { _ = conn.Close() }()
client := flowaccess.NewAccessAPIClient(conn)
Expand Down
2 changes: 1 addition & 1 deletion storage/remote/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*St
store.host,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)),
grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig),
utils.DefaultGRPCRetryInterceptor(),
)
if err != nil {
return nil, fmt.Errorf("could not connect to rpc host: %w", err)
Expand Down
85 changes: 72 additions & 13 deletions utils/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,77 @@

package utils

// DefaultGRPCServiceConfig configures native gRPC retries for transient failures.
// The empty object wildcard [{}] matches all services and methods.
const DefaultGRPCServiceConfig = `{
"methodConfig": [{
"name": [{}],
"retryPolicy": {
"maxAttempts": 8,
"initialBackoff": "1s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED", "UNKNOWN"]
import (
"context"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
defaultMaxAttempts = 10
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be configurable for any reason?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be fine tuned, but imo this is a good enough general number for now.

defaultInitialBackoff = 1 * time.Second
defaultMaxBackoff = 30 * time.Second
defaultBackoffFactor = 2.0
)

// DefaultGRPCRetryInterceptor returns a unary client interceptor that retries
// transient failures with exponential backoff. Unlike native gRPC retries, this
// ignores server pushback headers (grpc-retry-pushback-ms) so rate-limited calls
// will retry client-side rather than fail immediately.
func DefaultGRPCRetryInterceptor() grpc.DialOption {
return grpc.WithChainUnaryInterceptor(retryInterceptor)
}

func retryInterceptor(
ctx context.Context,
method string,
req, reply any,
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
var lastErr error
backoff := defaultInitialBackoff

for attempt := 0; attempt < defaultMaxAttempts; attempt++ {
if attempt > 0 {
// Wait before retry
select {
case <-time.After(backoff):
case <-ctx.Done():
return ctx.Err()
}
// Exponential backoff with cap
backoff = time.Duration(float64(backoff) * defaultBackoffFactor)
if backoff > defaultMaxBackoff {
backoff = defaultMaxBackoff
}
}
}]
}`

lastErr = invoker(ctx, method, req, reply, cc, opts...)
if lastErr == nil {
return nil
}

// Check if error is retryable
code := status.Code(lastErr)
if !isRetryableCode(code) {
return lastErr
}
}

return lastErr
}

func isRetryableCode(code codes.Code) bool {
switch code {
case codes.Unavailable, codes.ResourceExhausted, codes.Unknown:
return true
default:
return false
}
}

Loading