diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index 0499bad8b..c31f8409d 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -8,7 +8,6 @@ import ( "time" pebbleDB "github.com/cockroachdb/pebble" - "github.com/onflow/flow-evm-gateway/metrics" "github.com/onflow/flow-go-sdk/access" "github.com/onflow/flow-go-sdk/access/grpc" "github.com/onflow/flow-go/fvm/environment" @@ -21,9 +20,12 @@ import ( "github.com/rs/zerolog" "github.com/sethvargo/go-limiter/memorystore" grpcOpts "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/onflow/flow-evm-gateway/api" "github.com/onflow/flow-evm-gateway/config" + "github.com/onflow/flow-evm-gateway/metrics" "github.com/onflow/flow-evm-gateway/models" errs "github.com/onflow/flow-evm-gateway/models/errors" "github.com/onflow/flow-evm-gateway/services/ingestion" @@ -33,6 +35,19 @@ import ( "github.com/onflow/flow-evm-gateway/storage/pebble" ) +const ( + // DefaultMaxMessageSize is the default maximum message size for gRPC responses + DefaultMaxMessageSize = 1024 * 1024 * 1024 + + // DefaultResourceExhaustedRetryDelay is the default delay between retries when the server returns + // a ResourceExhausted error. + DefaultResourceExhaustedRetryDelay = 100 * time.Millisecond + + // DefaultResourceExhaustedMaxRetryDelay is the default max request duration when retrying server + // ResourceExhausted errors. + DefaultResourceExhaustedMaxRetryDelay = 30 * time.Second +) + type Storages struct { Storage *pebble.Storage Registers *pebble.RegisterStorage @@ -452,7 +467,13 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques // create access client with cross-spork capabilities currentSporkClient, err := grpc.NewClient( config.AccessNodeHost, - grpc.WithGRPCDialOptions(grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(1024*1024*1024))), + grpc.WithGRPCDialOptions( + grpcOpts.WithDefaultCallOptions(grpcOpts.MaxCallRecvMsgSize(DefaultMaxMessageSize)), + grpcOpts.WithUnaryInterceptor(retryInterceptor( + DefaultResourceExhaustedMaxRetryDelay, + DefaultResourceExhaustedRetryDelay, + )), + ), ) if err != nil { return nil, fmt.Errorf( @@ -487,6 +508,44 @@ func setupCrossSporkClient(config config.Config, logger zerolog.Logger) (*reques return client, nil } +// retryInterceptor is a gRPC client interceptor that retries the request when the server returns +// a ResourceExhausted error +func retryInterceptor(maxDuration, pauseDuration time.Duration) grpcOpts.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpcOpts.ClientConn, + invoker grpcOpts.UnaryInvoker, + opts ...grpcOpts.CallOption, + ) error { + start := time.Now() + attempts := 0 + for { + err := invoker(ctx, method, req, reply, cc, opts...) + if err == nil { + return nil + } + + if status.Code(err) != codes.ResourceExhausted { + return err + } + + attempts++ + duration := time.Since(start) + if duration >= maxDuration { + return fmt.Errorf("request failed (attempts: %d, duration: %v): %w", attempts, duration, err) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(pauseDuration): + } + } + } +} + // setupStorage creates storage and initializes it with configured starting cadence height // in case such a height doesn't already exist in the database. func setupStorage( @@ -570,9 +629,9 @@ func setupStorage( Stringer("fvm_address_for_evm_storage_account", storageAddress). Msgf("database initialized with cadence height: %d", cadenceHeight) } - //else { + // else { // // TODO(JanezP): verify storage account owner is correct - //} + // } return db, &Storages{ Storage: store, diff --git a/bootstrap/bootstrap_test.go b/bootstrap/bootstrap_test.go new file mode 100644 index 000000000..73be33a37 --- /dev/null +++ b/bootstrap/bootstrap_test.go @@ -0,0 +1,87 @@ +package bootstrap + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestRetryInterceptor(t *testing.T) { + expecterErr := status.Error(codes.ResourceExhausted, "resource exhausted") + interceptor := retryInterceptor(100*time.Millisecond, 10*time.Millisecond) + + testCases := []struct { + name string + invoker func(callCount int) error + maxRequestTime time.Duration + callCount int // expect exact count + minCallCount int // min, for when using a timeout + expectedErr error + }{ + { + name: "no error", + invoker: func(callCount int) error { + return nil + }, + maxRequestTime: 10 * time.Millisecond, + callCount: 1, + expectedErr: nil, + }, + { + name: "succeeds on 3rd attempt", + invoker: func(callCount int) error { + if callCount >= 3 { + return nil + } + return expecterErr + }, + maxRequestTime: 40 * time.Millisecond, + callCount: 3, + expectedErr: nil, + }, + { + name: "fails after timeout", + invoker: func(callCount int) error { + return expecterErr + }, + maxRequestTime: 150 * time.Millisecond, // add a buffer for test slowness + minCallCount: 10, + expectedErr: expecterErr, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + callCount := 0 + invoker := func(context.Context, string, any, any, *grpc.ClientConn, ...grpc.CallOption) error { + callCount++ + return tc.invoker(callCount) + } + + start := time.Now() + err := interceptor( + context.Background(), "", nil, nil, nil, + invoker, + ) + if tc.expectedErr != nil { + assert.ErrorIs(t, err, tc.expectedErr) + } else { + assert.NoError(t, err) + } + + if tc.minCallCount > 0 { + assert.GreaterOrEqual(t, callCount, tc.minCallCount) + } else { + assert.Equal(t, callCount, tc.callCount) + } + assert.LessOrEqual(t, time.Since(start), tc.maxRequestTime) + }) + } +} diff --git a/services/ingestion/event_subscriber.go b/services/ingestion/event_subscriber.go index 5413b2a86..bc9d37084 100644 --- a/services/ingestion/event_subscriber.go +++ b/services/ingestion/event_subscriber.go @@ -5,12 +5,9 @@ import ( "errors" "fmt" "sort" - "time" "github.com/onflow/cadence/common" "github.com/onflow/flow-go/fvm/evm/events" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/onflow/flow-evm-gateway/models" errs "github.com/onflow/flow-evm-gateway/models/errors" @@ -281,21 +278,11 @@ func (r *RPCEventSubscriber) backfillSporkFromHeight(ctx context.Context, fromCa blocks, err := r.client.GetEventsForHeightRange(ctx, blockExecutedEvent, startHeight, endHeight) if err != nil { - // if we are rate limited by the AN, wait a bit and try again - if status.Code(err) == codes.ResourceExhausted { - time.Sleep(100 * time.Millisecond) - continue - } return 0, fmt.Errorf("failed to get block events: %w", err) } transactions, err := r.client.GetEventsForHeightRange(ctx, transactionExecutedEvent, startHeight, endHeight) if err != nil { - // if we are rate limited by the AN, wait a bit and try again - if status.Code(err) == codes.ResourceExhausted { - time.Sleep(100 * time.Millisecond) - continue - } return 0, fmt.Errorf("failed to get block events: %w", err) } @@ -366,11 +353,6 @@ func (r *RPCEventSubscriber) accumulateBlockEvents( currentHeight+maxRangeForGetEvents, ) if err != nil { - // if we are rate limited by the AN, wait a bit and try again - if status.Code(err) == codes.ResourceExhausted { - time.Sleep(100 * time.Millisecond) - continue - } return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err) } @@ -381,11 +363,6 @@ func (r *RPCEventSubscriber) accumulateBlockEvents( currentHeight+maxRangeForGetEvents, ) if err != nil { - // if we are rate limited by the AN, wait a bit and try again - if status.Code(err) == codes.ResourceExhausted { - time.Sleep(100 * time.Millisecond) - continue - } return models.BlockEvents{}, fmt.Errorf("failed to get block events: %w", err) }