Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ require (
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.3.0
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
golang.org/x/net v0.40.0
golang.org/x/sync v0.14.0
golang.org/x/net v0.41.0
golang.org/x/sync v0.15.0
golang.org/x/time v0.1.0
google.golang.org/grpc v1.72.1
google.golang.org/grpc v1.75.0
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1
)
Expand Down Expand Up @@ -123,14 +123,14 @@ require (
go.opentelemetry.io/proto/otlp v1.6.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.38.0 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/crypto v0.39.0 // indirect
golang.org/x/mod v0.25.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/text v0.25.0 // indirect
golang.org/x/tools v0.22.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237 // indirect
golang.org/x/text v0.26.0 // indirect
golang.org/x/tools v0.33.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
google.golang.org/protobuf v1.36.6 // indirect
)

Expand Down
38 changes: 20 additions & 18 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
Expand All @@ -464,8 +464,8 @@ golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.18.0 h1:5+9lSbEzPSdWkH32vYPBwEpX8KwDbM52Ud9xBUvNlb0=
golang.org/x/mod v0.18.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w=
golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand All @@ -492,8 +492,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand All @@ -512,8 +512,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down Expand Up @@ -574,8 +574,8 @@ golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
golang.org/x/time v0.1.0 h1:xYY+Bajn2a7VBmTM5GikTmnK8ZuX8YgnQCqZpbBNtmA=
golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
Expand All @@ -594,12 +594,14 @@ golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA=
golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c=
golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc=
golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
Expand All @@ -609,10 +611,10 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237 h1:Kog3KlB4xevJlAcbbbzPfRG0+X9fdoGM+UBRKVz6Wr0=
google.golang.org/genproto/googleapis/api v0.0.0-20250519155744-55703ea1f237/go.mod h1:ezi0AVyMKDWy5xAncvjLWH7UcLBB5n7y2fQ8MzjJcto=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237 h1:cJfm9zPbe1e873mHJzmQ1nwVEeRDU/T1wXDK2kUSU34=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 h1:FiusG7LWj+4byqhbvmB+Q93B/mOxJLN2DTozDuZm4EU=
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:kXqgZtrWaf6qS3jZOCnCH7WYfrvFjkC51bM8fz3RsCA=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand All @@ -621,8 +623,8 @@ google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQ
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.72.1 h1:HR03wO6eyZ7lknl75XlxABNVLLFc2PAb6mHlYh756mA=
google.golang.org/grpc v1.72.1/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4=
google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
148 changes: 139 additions & 9 deletions server/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@ package server
import (
"context"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
)

const unprocessedRequestCheckTimeout = 10 * time.Second

type GrpcInflightMethodLimiter interface {
// RPCCallStarting is called before request has been read into memory.
// All that's known about the request at this point is grpc method name.
Expand All @@ -26,32 +32,94 @@ type GrpcInflightMethodLimiter interface {
RPCCallProcessing(ctx context.Context, methodName string) (func(error), error)

// RPCCallFinished is called when an RPC call is finished being handled.
// Under certain very rare race conditions it might be called earlier than the actual request processing is finished.
RPCCallFinished(ctx context.Context)
}

func newGrpcInflightLimitCheck(methodLimiter GrpcInflightMethodLimiter) *grpcInflightLimitCheck {
func newGrpcInflightLimitCheck(methodLimiter GrpcInflightMethodLimiter, logger log.Logger) *grpcInflightLimitCheck {
return &grpcInflightLimitCheck{
methodLimiter: methodLimiter,
logger: logger,
}
}

// grpcInflightLimitCheck implements gRPC TapHandle and gRPC stats.Handler.
// grpcInflightLimitCheck can track inflight requests, and reject requests before even reading them into memory.
type grpcInflightLimitCheck struct {
methodLimiter GrpcInflightMethodLimiter

logger log.Logger

// Used to mock time.AfterFunc in tests.
timeAfterFuncMock func(d time.Duration, f func()) testableTimer
}

// TapHandle is called after receiving grpc request and headers, but before reading any request data yet.
// If we reject request here (by returning non-nil error), it won't be counted towards any metrics (eg. in middleware.grpcStatsHandler).
// If we accept request (no error), eventually HandleRPC with stats.End notification will be called.
// If we accept request (no error), the request should be processed and eventually HandleRPC with stats.End notification will be called,
// unless the context is cancelled before we start processing the request.
func (g *grpcInflightLimitCheck) TapHandle(ctx context.Context, info *tap.Info) (context.Context, error) {
if !isMethodNameValid(info.FullMethodName) {
// If method name is not valid, we let the request continue, but not call method limiter.
// Otherwise, we would not be able to call method limiter again when the call finishes, because in this case grpc server will not call stat handler.
return ctx, nil
}

return g.methodLimiter.RPCCallStarting(ctx, info.FullMethodName, info.Header)
ctx, err := g.methodLimiter.RPCCallStarting(ctx, info.FullMethodName, info.Header)
if err != nil {
return ctx, err
}

// We called RPCCallStarting, so we need to ensure RPCCallFinished is called once the request is done.
// Because of a shortcut introduced in https://github.com/grpc/grpc-go/pull/8439 this may not happen.
// We could create a goroutine that would watch ctx.Done() and call RPCCallFinished if the context is done and we have not started processing the headers yet.
// However, that would mean paying the cost of an extra goroutine for every single gRPC request, just in case the request's context is cancelled before we start processing it.
// Instead of that we schedule a cheaper timer that we will cancel in the happy case, which will run after 10s and perform the cleanup only when
state := &gprcInflightLimitCheckerState{
fullMethod: info.FullMethodName,
timestamp: time.Now(),
headersProcessed: make(chan struct{}),
}
state.nonProcessedRequestTimer = g.timeAfterFunc(unprocessedRequestCheckTimeout, g.checkProbablyEarlyAbortedRequest(ctx, state))

return context.WithValue(ctx, gprcInflightLimitCheckerStateKey{}, state), nil
}

func (g *grpcInflightLimitCheck) checkProbablyEarlyAbortedRequest(ctx context.Context, state *gprcInflightLimitCheckerState) func() {
return func() {
// If this function is running, we're in a corner case. Be very verbose in logging to help with debugging.
logger := state.logger(g.logger)

level.Warn(g.logger).Log("msg", "gRPC request processing didn't start within 10s of receiving, checking the context state")
select {
case <-ctx.Done():
level.Info(logger).Log("msg", "gRPC request context is done, assuming the request was cancelled before processing started, will call RPCCallFinished")
case <-state.headersProcessed:
level.Info(logger).Log("msg", "gRPC request processing has started, no need to call RPCCallFinished", "time_to_start_processing", time.Since(state.timestamp).String())
return
default:
level.Info(logger).Log("msg", "gRPC request context is not done and processing hasn't started, will wait until context is done or processing starts")

select {
case <-ctx.Done():
level.Info(logger).Log("msg", "gRPC request context is finally done, assuming the request was cancelled before processing started, will call RPCCallFinished")
case <-state.headersProcessed:
level.Info(logger).Log("msg", "gRPC request processing has finally started, no need to call RPCCallFinished", "time_to_start_processing", time.Since(state.timestamp).String())
return
}
}

called := false
state.rpcCallFinishedOnce.Do(func() {
called = true
g.methodLimiter.RPCCallFinished(ctx)
})
if called {
level.Info(logger).Log("msg", "called RPCCallFinished for gRPC request that never started processing")
} else {
level.Info(logger).Log("msg", "RPCCallFinishes was already called for this gRPC request, no need to call it again")
}
}
}

func (g *grpcInflightLimitCheck) UnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
Expand All @@ -78,18 +146,40 @@ func (g *grpcInflightLimitCheck) StreamServerInterceptor(srv interface{}, ss grp
}
return err
}

func (g *grpcInflightLimitCheck) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}

func (g *grpcInflightLimitCheck) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) {
// when request ends, and we started "inflight" request tracking for it, finish it.
if _, ok := rpcStats.(*stats.End); !ok {
return
switch rpcStats.(type) {
case *stats.InHeader:
if state, ok := ctx.Value(gprcInflightLimitCheckerStateKey{}).(*gprcInflightLimitCheckerState); ok {
// We're processing this request, stop the timer.
if !state.nonProcessedRequestTimer.Stop() {
level.Warn(state.logger(g.logger)).Log("msg", "gRPC request processing has started, but the non-processing timer already fired, need to signal that we're processing it now")
// The timer has already expired, so the function is either executing or has executed.
//
// This (stats.InHeader) should be called once and only once, but gRPC is known for changing contracts
// and we don't want this to start panicking trying to close the channel multiple times,
// so a sync.Once doesn't hurt here.
state.headersProcessedOnce.Do(func() { close(state.headersProcessed) })
}
}

case *stats.End:
if state, ok := ctx.Value(gprcInflightLimitCheckerStateKey{}).(*gprcInflightLimitCheckerState); ok {
// We're done processing the request, but there's a scenario under which we may have already called RPCCallFinished,
// from the goroutine watching the context in TapHandle, so we need to ensure it's called only once.
called := false
state.rpcCallFinishedOnce.Do(func() {
g.methodLimiter.RPCCallFinished(ctx)
called = true
})
if !called {
level.Warn(state.logger(g.logger)).Log("msg", "RPCCallFinished was already called for this gRPC request before the request actually finished processing")
}
}
}

g.methodLimiter.RPCCallFinished(ctx)
}

func (g *grpcInflightLimitCheck) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
Expand All @@ -100,6 +190,13 @@ func (g *grpcInflightLimitCheck) HandleConn(_ context.Context, _ stats.ConnStats
// Not interested.
}

func (g *grpcInflightLimitCheck) timeAfterFunc(d time.Duration, f func()) testableTimer {
if g.timeAfterFuncMock != nil {
return g.timeAfterFuncMock(d, f)
}
return testableTimer{timer: time.AfterFunc(d, f)}
}

// This function mimics the check in grpc library, server.go, handleStream method. handleStream method can stop processing early,
// without calling stat handler if the method name is invalid.
func isMethodNameValid(method string) bool {
Expand All @@ -109,3 +206,36 @@ func isMethodNameValid(method string) bool {
pos := strings.LastIndex(method, "/")
return pos >= 0
}

type gprcInflightLimitCheckerStateKey struct{}

type gprcInflightLimitCheckerState struct {
fullMethod string
timestamp time.Time

nonProcessedRequestTimer testableTimer
headersProcessedOnce sync.Once
headersProcessed chan struct{}

rpcCallFinishedOnce sync.Once
}

func (state *gprcInflightLimitCheckerState) logger(baseLogger log.Logger) log.Logger {
return log.With(baseLogger, "method", state.fullMethod, "req_timestamp", state.timestamp.Format(time.RFC3339Nano))
}

type testableTimer struct {
// timer is what we use in production code.
timer *time.Timer

// stop is used in tests to mock timer stopping behavior.
stop func() bool
}

func (t testableTimer) Stop() bool {
if t.timer != nil {
return t.timer.Stop()
}

return t.stop()
}
Loading
Loading