Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/onflow/cadence v1.8.3
github.com/onflow/crypto v0.25.3
github.com/onflow/flow-core-contracts/lib/go/templates v1.9.2
github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251114171948-8b69d4ce50bb
github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251117184523-ab0655178589
github.com/onflow/flow-go-sdk v1.9.2
github.com/onflow/flow-nft/lib/go/contracts v1.3.0
github.com/onflow/flow/protobuf/go/flow v0.4.18
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -768,8 +768,8 @@ github.com/onflow/flow-ft/lib/go/contracts v1.0.1 h1:Ts5ob+CoCY2EjEd0W6vdLJ7hLL3
github.com/onflow/flow-ft/lib/go/contracts v1.0.1/go.mod h1:PwsL8fC81cjnUnTfmyL/HOIyHnyaw/JA474Wfj2tl6A=
github.com/onflow/flow-ft/lib/go/templates v1.0.1 h1:FDYKAiGowABtoMNusLuRCILIZDtVqJ/5tYI4VkF5zfM=
github.com/onflow/flow-ft/lib/go/templates v1.0.1/go.mod h1:uQ8XFqmMK2jxyBSVrmyuwdWjTEb+6zGjRYotfDJ5pAE=
github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251114171948-8b69d4ce50bb h1:pdoWUpQNdkdLUY8CzWdlV7zgEKX6c93i+TEQk0dLu7Y=
github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251114171948-8b69d4ce50bb/go.mod h1:4A34mlMMd7usjw7e0r4VrltFpZRjhYkV+T+iecTbzO0=
github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251117184523-ab0655178589 h1:EqsUfh8MW4vOdWfXc1D8kKr4VBhRKsqeNCBeEWbw5Ew=
github.com/onflow/flow-go v0.44.0-experimental-cadence-v1.8.3.0.20251117184523-ab0655178589/go.mod h1:4A34mlMMd7usjw7e0r4VrltFpZRjhYkV+T+iecTbzO0=
github.com/onflow/flow-go-sdk v1.9.2 h1:kMw3qShgLNIASHGMgoY+faTBQ+1MnzsNLAH+oxy9eiY=
github.com/onflow/flow-go-sdk v1.9.2/go.mod h1:qVuzMGXNJBMktKnIDKLjV0/k21P2XD39dOfMW+X5Bsc=
github.com/onflow/flow-nft/lib/go/contracts v1.3.0 h1:DmNop+O0EMyicZvhgdWboFG57xz5t9Qp81FKlfKyqJc=
Expand Down
19 changes: 15 additions & 4 deletions server/fork_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/onflow/cadence"
"github.com/onflow/flow-emulator/convert"
"github.com/onflow/flow-emulator/utils"
flowsdk "github.com/onflow/flow-go-sdk"
flowgo "github.com/onflow/flow-go/model/flow"
flowaccess "github.com/onflow/flow/protobuf/go/flow/access"
Expand All @@ -36,13 +37,18 @@ import (
func TestForkingAgainstTestnet(t *testing.T) {
logger := zerolog.Nop()

// Get remote latest sealed height to pin fork
conn, err := grpc.NewClient("access.testnet.nodes.onflow.org:9000", grpc.WithTransportCredentials(insecure.NewCredentials()))
// Get remote latest sealed height to pin fork with automatic retry
conn, err := grpc.NewClient(
"access.testnet.nodes.onflow.org:9000",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig),
)
if err != nil {
t.Fatalf("dial remote: %v", err)
}
defer func() { _ = conn.Close() }()
remote := flowaccess.NewAccessAPIClient(conn)

rh, err := remote.GetLatestBlockHeader(context.Background(), &flowaccess.GetLatestBlockHeaderRequest{IsSealed: true})
if err != nil {
t.Fatalf("get remote header: %v", err)
Expand Down Expand Up @@ -167,13 +173,18 @@ func TestForkingAgainstTestnet(t *testing.T) {
func TestForkingAgainstMainnet(t *testing.T) {
logger := zerolog.Nop()

// Get remote latest sealed height to pin fork
conn, err := grpc.NewClient("access.mainnet.nodes.onflow.org:9000", grpc.WithTransportCredentials(insecure.NewCredentials()))
// Get remote latest sealed height to pin fork with automatic retry
conn, err := grpc.NewClient(
"access.mainnet.nodes.onflow.org:9000",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig),
)
if err != nil {
t.Fatalf("dial remote: %v", err)
}
defer func() { _ = conn.Close() }()
remote := flowaccess.NewAccessAPIClient(conn)

rh, err := remote.GetLatestBlockHeader(context.Background(), &flowaccess.GetLatestBlockHeaderRequest{IsSealed: true})
if err != nil {
t.Fatalf("get remote header: %v", err)
Expand Down
20 changes: 13 additions & 7 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ import (
"github.com/onflow/flow-emulator/emulator"
"github.com/onflow/flow-emulator/server/access"
"github.com/onflow/flow-emulator/server/debugger"
"github.com/onflow/flow-emulator/server/utils"
serverutils "github.com/onflow/flow-emulator/server/utils"
"github.com/onflow/flow-emulator/storage"
"github.com/onflow/flow-emulator/storage/checkpoint"
"github.com/onflow/flow-emulator/storage/remote"
"github.com/onflow/flow-emulator/storage/sqlite"
"github.com/onflow/flow-emulator/storage/util"
"github.com/onflow/flow-emulator/utils"
)

// EmulatorServer is a local server that runs a Flow Emulator instance.
Expand All @@ -65,7 +66,7 @@ type EmulatorServer struct {
storage graceland.Routine
grpc *access.GRPCServer
rest *access.RestServer
admin *utils.HTTPServer
admin *serverutils.HTTPServer
blocks graceland.Routine
debugger graceland.Routine
}
Expand All @@ -79,7 +80,7 @@ const (
defaultDBGCRatio = 0.5
)

var defaultHTTPHeaders = []utils.HTTPHeader{
var defaultHTTPHeaders = []serverutils.HTTPHeader{
{
Key: "Access-Control-Allow-Origin",
Value: "*",
Expand All @@ -102,7 +103,7 @@ type Config struct {
DebuggerPort int
RESTPort int
RESTDebug bool
HTTPHeaders []utils.HTTPHeader
HTTPHeaders []serverutils.HTTPHeader
BlockTime time.Duration
ServicePublicKey crypto.PublicKey
ServicePrivateKey crypto.PrivateKey
Expand Down Expand Up @@ -243,7 +244,7 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer {
}

accessAdapter := adapters.NewAccessAdapter(logger, emulatedBlockchain)
livenessTicker := utils.NewLivenessTicker(conf.LivenessCheckTolerance)
livenessTicker := serverutils.NewLivenessTicker(conf.LivenessCheckTolerance)
grpcServer := access.NewGRPCServer(logger, emulatedBlockchain, accessAdapter, chain, conf.Host, conf.GRPCPort, conf.GRPCDebug)
restServer, err := access.NewRestServer(logger, emulatedBlockchain, accessAdapter, chain, conf.Host, conf.RESTPort, conf.RESTDebug)
if err != nil {
Expand All @@ -264,7 +265,7 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer {
debugger: debugger.New(logger, emulatedBlockchain, conf.DebuggerPort),
}

server.admin = utils.NewAdminServer(logger, emulatedBlockchain, accessAdapter, grpcServer, livenessTicker, conf.Host, conf.AdminPort, conf.HTTPHeaders)
server.admin = serverutils.NewAdminServer(logger, emulatedBlockchain, accessAdapter, grpcServer, livenessTicker, conf.Host, conf.AdminPort, conf.HTTPHeaders)

// only create blocks ticker if block time > 0
if conf.BlockTime > 0 {
Expand All @@ -280,12 +281,17 @@ func NewEmulatorServer(logger *zerolog.Logger, conf *Config) *EmulatorServer {
// detectRemoteChainID connects to the remote access node and fetches network parameters to obtain the chain ID.
func DetectRemoteChainID(url string) (flowgo.ChainID, error) {
// Expect raw host:port
conn, err := grpc.NewClient(url, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.NewClient(
url,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig),
)
if err != nil {
return "", err
}
defer func() { _ = conn.Close() }()
client := flowaccess.NewAccessAPIClient(conn)

resp, err := client.GetNetworkParameters(context.Background(), &flowaccess.GetNetworkParametersRequest{})
if err != nil {
return "", err
Expand Down
155 changes: 13 additions & 142 deletions storage/remote/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ package remote
import (
"context"
"fmt"
"math"
"math/rand"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/onflow/flow-go/engine/common/rpc/convert"
Expand All @@ -42,114 +39,14 @@ import (
"github.com/onflow/flow-emulator/storage"
"github.com/onflow/flow-emulator/storage/sqlite"
"github.com/onflow/flow-emulator/types"
"github.com/onflow/flow-emulator/utils"
)

// Retry and concurrency configuration
// Configuration
const (
maxRetries = 5
baseDelay = 100 * time.Millisecond
maxDelay = 30 * time.Second
jitterFactor = 0.1
maxConcurrentRequests = 10 // Maximum concurrent requests to remote node
blockBuffer = 10 // Buffer to allow for block propagation
blockBuffer = 10 // Buffer to allow for block propagation
)

// isRateLimitError checks if the error is a rate limiting error
func isRateLimitError(err error) bool {
if err == nil {
return false
}

st, ok := status.FromError(err)
if !ok {
return false
}

// ResourceExhausted is the standard gRPC code for rate limiting
return st.Code() == codes.ResourceExhausted
}

// exponentialBackoffWithJitter calculates delay with exponential backoff and jitter
func exponentialBackoffWithJitter(attempt int) time.Duration {
if attempt <= 0 {
return baseDelay
}

// Calculate exponential delay: baseDelay * 2^(attempt-1)
delay := float64(baseDelay) * math.Pow(2, float64(attempt-1))

// Cap at maxDelay
if delay > float64(maxDelay) {
delay = float64(maxDelay)
}

// Add jitter: ±10% random variation
jitter := delay * jitterFactor * (2*rand.Float64() - 1)
delay += jitter

// Ensure minimum delay
if delay < float64(baseDelay) {
delay = float64(baseDelay)
}

return time.Duration(delay)
}

// retryWithBackoff executes a function with exponential backoff retry on rate limit errors
func (s *Store) retryWithBackoff(ctx context.Context, operation string, fn func() error) error {
// Acquire semaphore to limit concurrent requests
select {
case s.concurrencySem <- struct{}{}:
defer func() { <-s.concurrencySem }()
case <-ctx.Done():
return ctx.Err()
}

var lastErr error

for attempt := 0; attempt <= maxRetries; attempt++ {
err := fn()
if err == nil {
return nil
}

lastErr = err

// Only retry on recognized, transient rate limit errors
if !isRateLimitError(err) {
return err
}

// Continue with retry logic for rate limits only
if attempt == maxRetries {
s.logger.Warn().
Str("operation", operation).
Int("attempt", attempt+1).
Err(err).
Msg("Request failed after max attempts")
return err
}

// Calculate delay and wait
delay := exponentialBackoffWithJitter(attempt)
s.logger.Debug().
Str("operation", operation).
Int("attempt", attempt+1).
Dur("delay", delay).
Err(err).
Msg("Rate limited, retrying with backoff")

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
// Continue to next attempt
}
}

return lastErr
}

type Store struct {
*sqlite.Store
executionClient executiondata.ExecutionDataAPIClient
Expand All @@ -159,7 +56,6 @@ type Store struct {
chainID flowgo.ChainID
forkHeight uint64
logger *zerolog.Logger
concurrencySem chan struct{} // Semaphore to limit concurrent requests
}

type Option func(*Store)
Expand Down Expand Up @@ -207,9 +103,8 @@ func WithClient(

func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*Store, error) {
store := &Store{
Store: provider,
logger: logger,
concurrencySem: make(chan struct{}, maxConcurrentRequests),
Store: provider,
logger: logger,
}

for _, opt := range options {
Expand All @@ -225,6 +120,7 @@ func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*St
store.host,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(1024*1024*1024)),
grpc.WithDefaultServiceConfig(utils.DefaultGRPCServiceConfig),
)
if err != nil {
return nil, fmt.Errorf("could not connect to rpc host: %w", err)
Expand All @@ -235,12 +131,7 @@ func New(provider *sqlite.Store, logger *zerolog.Logger, options ...Option) (*St
store.accessClient = access.NewAccessAPIClient(conn)
}

var params *access.GetNetworkParametersResponse
err := store.retryWithBackoff(context.Background(), "GetNetworkParameters", func() error {
var err error
params, err = store.accessClient.GetNetworkParameters(context.Background(), &access.GetNetworkParametersRequest{})
return err
})
params, err := store.accessClient.GetNetworkParameters(context.Background(), &access.GetNetworkParametersRequest{})
if err != nil {
return nil, fmt.Errorf("could not get network parameters: %w", err)
}
Expand Down Expand Up @@ -282,12 +173,7 @@ func (s *Store) initializeStartBlock(ctx context.Context) error {

// use the current latest block from the rpc host if no height was provided
if s.forkHeight == 0 {
var resp *access.BlockHeaderResponse
err := s.retryWithBackoff(ctx, "GetLatestBlockHeader", func() error {
var err error
resp, err = s.accessClient.GetLatestBlockHeader(ctx, &access.GetLatestBlockHeaderRequest{IsSealed: true})
return err
})
resp, err := s.accessClient.GetLatestBlockHeader(ctx, &access.GetLatestBlockHeaderRequest{IsSealed: true})
if err != nil {
return fmt.Errorf("could not get last block height: %w", err)
}
Expand Down Expand Up @@ -322,12 +208,7 @@ func (s *Store) BlockByID(ctx context.Context, blockID flowgo.Identifier) (*flow
if err == nil {
height = block.Height
} else if errors.Is(err, storage.ErrNotFound) {
var heightRes *access.BlockHeaderResponse
err := s.retryWithBackoff(ctx, "GetBlockHeaderByID", func() error {
var err error
heightRes, err = s.accessClient.GetBlockHeaderByID(ctx, &access.GetBlockHeaderByIDRequest{Id: blockID[:]})
return err
})
heightRes, err := s.accessClient.GetBlockHeaderByID(ctx, &access.GetBlockHeaderByIDRequest{Id: blockID[:]})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -367,12 +248,7 @@ func (s *Store) BlockByHeight(ctx context.Context, height uint64) (*flowgo.Block
return nil, &types.BlockNotFoundByHeightError{Height: height}
}

var blockRes *access.BlockHeaderResponse
err = s.retryWithBackoff(ctx, "GetBlockHeaderByHeight", func() error {
var err error
blockRes, err = s.accessClient.GetBlockHeaderByHeight(ctx, &access.GetBlockHeaderByHeightRequest{Height: height})
return err
})
blockRes, err := s.accessClient.GetBlockHeaderByHeight(ctx, &access.GetBlockHeaderByHeightRequest{Height: height})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -429,14 +305,9 @@ func (s *Store) LedgerByHeight(
}

registerID := convert.RegisterIDToMessage(flowgo.RegisterID{Key: id.Key, Owner: id.Owner})
var response *executiondata.GetRegisterValuesResponse

err = s.retryWithBackoff(ctx, "GetRegisterValues", func() error {
response, err = s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{
BlockHeight: lookupHeight,
RegisterIds: []*entities.RegisterID{registerID},
})
return err
response, err := s.executionClient.GetRegisterValues(ctx, &executiondata.GetRegisterValuesRequest{
BlockHeight: lookupHeight,
RegisterIds: []*entities.RegisterID{registerID},
})

if err != nil {
Expand Down
Loading
Loading