diff --git a/CHANGELOG.md b/CHANGELOG.md index d259cb8a115..a2037ac26cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -100,6 +100,7 @@ * [BUGFIX] Distributor: Report the correct size in the `err-mimir-distributor-max-write-message-size` error. #12799 * [BUGFIX] Query-frontend: Fix issue where expressions containing unary negation could be sharded incorrectly in some cases. #12911 * [BUGFIX] Query-frontend: Fix issue where shardable expressions containing aggregations with a shardable parameter (eg. `sum(foo)` in `topk(scalar(sum(foo)), sum by (pod) (bar))`) would not have the parameter sharded. #12958 +* [BUGFIX] Ingester: Fix `max_inflight_push_requests` metric and internal counter not decremented under pressure, possibly causing the rejection of all push requests. #12975 ### Mixin diff --git a/go.mod b/go.mod index f6a4f7ce0fe..e3867a726b6 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/golang/snappy v1.0.0 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20251009074119-2b7008cbb887 + github.com/grafana/dskit v0.0.0-20251010193112-965b207c61b8 github.com/grafana/e2e v0.1.2-0.20250825134630-3cea6f657739 github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/influxdata/influxdb/v2 v2.7.12 diff --git a/go.sum b/go.sum index cf0e4bae31f..8b79affc27c 100644 --- a/go.sum +++ b/go.sum @@ -561,8 +561,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20251002141545-d513d62d3210 h1:R4+ks/StOXvv+j8U7J+WQXqpa0e5bLZKFac9y20xeck= github.com/grafana/alerting v0.0.0-20251002141545-d513d62d3210/go.mod h1:VGjS5gDwWEADPP6pF/drqLxEImgeuHlEW5u8E5EfIrM= -github.com/grafana/dskit v0.0.0-20251009074119-2b7008cbb887 h1:LCC+ecGr8/aRfJWoOM1Vvv/ZSLIbpgCmW/uvQEEm1UI= -github.com/grafana/dskit v0.0.0-20251009074119-2b7008cbb887/go.mod h1:ysMKGt75FgpBQ5l1yPoVUjo21WMfPYjgJDybbKdpox0= +github.com/grafana/dskit v0.0.0-20251010193112-965b207c61b8 h1:60HJvGdtHhS+o1RCaYuLNANkATpkLccUI252L930zyg= +github.com/grafana/dskit v0.0.0-20251010193112-965b207c61b8/go.mod h1:8AGVTqx49L8iYNAW6ls2n3+li4v70nenYr74sBsLKMk= github.com/grafana/e2e v0.1.2-0.20250825134630-3cea6f657739 h1:74hHXvOG42Y87T7jO7naPB5sZpwO3TDGTFiUL48s2Yc= github.com/grafana/e2e v0.1.2-0.20250825134630-3cea6f657739/go.mod h1:9bciABa7gW4B3t12C9vpgk54NeeoOZ+1DLsUnOAf+8Y= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ= diff --git a/vendor/github.com/grafana/dskit/server/limits.go b/vendor/github.com/grafana/dskit/server/limits.go index b9c9f3b3117..7096d25fc24 100644 --- a/vendor/github.com/grafana/dskit/server/limits.go +++ b/vendor/github.com/grafana/dskit/server/limits.go @@ -3,13 +3,21 @@ package server import ( "context" "strings" + "sync" + "time" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/tap" ) +// unprocessedRequestCheckTimeout is large enough for a normal request to start processing, +// and small enough to cleanup quickly if the request was cancelled and early aborted. +const unprocessedRequestCheckTimeout = 10 * time.Second + type GrpcInflightMethodLimiter interface { // RPCCallStarting is called before request has been read into memory. // All that's known about the request at this point is grpc method name. @@ -26,12 +34,14 @@ type GrpcInflightMethodLimiter interface { RPCCallProcessing(ctx context.Context, methodName string) (func(error), error) // RPCCallFinished is called when an RPC call is finished being handled. + // Under certain very rare race conditions it might be called earlier than the actual request processing is finished. RPCCallFinished(ctx context.Context) } -func newGrpcInflightLimitCheck(methodLimiter GrpcInflightMethodLimiter) *grpcInflightLimitCheck { +func newGrpcInflightLimitCheck(methodLimiter GrpcInflightMethodLimiter, logger log.Logger) *grpcInflightLimitCheck { return &grpcInflightLimitCheck{ methodLimiter: methodLimiter, + logger: logger, } } @@ -39,11 +49,17 @@ func newGrpcInflightLimitCheck(methodLimiter GrpcInflightMethodLimiter) *grpcInf // grpcInflightLimitCheck can track inflight requests, and reject requests before even reading them into memory. type grpcInflightLimitCheck struct { methodLimiter GrpcInflightMethodLimiter + + logger log.Logger + + // Used to mock time.AfterFunc in tests. + timeAfterFuncMock func(d time.Duration, f func()) testableTimer } // TapHandle is called after receiving grpc request and headers, but before reading any request data yet. // If we reject request here (by returning non-nil error), it won't be counted towards any metrics (eg. in middleware.grpcStatsHandler). -// If we accept request (no error), eventually HandleRPC with stats.End notification will be called. +// If we accept request (no error), the request should be processed and eventually HandleRPC with stats.End notification will be called, +// unless the context is cancelled before we start processing the request. func (g *grpcInflightLimitCheck) TapHandle(ctx context.Context, info *tap.Info) (context.Context, error) { if !isMethodNameValid(info.FullMethodName) { // If method name is not valid, we let the request continue, but not call method limiter. @@ -51,7 +67,61 @@ func (g *grpcInflightLimitCheck) TapHandle(ctx context.Context, info *tap.Info) return ctx, nil } - return g.methodLimiter.RPCCallStarting(ctx, info.FullMethodName, info.Header) + ctx, err := g.methodLimiter.RPCCallStarting(ctx, info.FullMethodName, info.Header) + if err != nil { + return ctx, err + } + + // We called RPCCallStarting, so we need to ensure RPCCallFinished is called once the request is done. + // Because of a shortcut introduced in https://github.com/grpc/grpc-go/pull/8439 this may not happen. + // We could create a goroutine that would watch ctx.Done() and call RPCCallFinished if the context is done and we have not started processing the headers yet. + // However, that would mean paying the cost of an extra goroutine for every single gRPC request, just in case the request's context is cancelled before we start processing it. + // Instead of that we schedule a cheaper timer that we will cancel in the happy case, which will run after 10s and perform the cleanup only when needed. + state := &gprcInflightLimitCheckerState{ + fullMethod: info.FullMethodName, + timestamp: time.Now(), + headersProcessed: make(chan struct{}), + } + state.nonProcessedRequestTimer = g.timeAfterFunc(unprocessedRequestCheckTimeout, g.checkProbablyEarlyAbortedRequest(ctx, state)) + + return context.WithValue(ctx, gprcInflightLimitCheckerStateKey{}, state), nil +} + +func (g *grpcInflightLimitCheck) checkProbablyEarlyAbortedRequest(ctx context.Context, state *gprcInflightLimitCheckerState) func() { + return func() { + // If this function is running, we're in a corner case. Be very verbose in logging to help with debugging. + logger := state.logger(g.logger) + + level.Warn(g.logger).Log("msg", "gRPC request processing didn't start within 10s of receiving, checking the context state") + select { + case <-ctx.Done(): + level.Info(logger).Log("msg", "gRPC request context is done, assuming the request was cancelled before processing started, will call RPCCallFinished") + case <-state.headersProcessed: + level.Info(logger).Log("msg", "gRPC request processing has started, no need to call RPCCallFinished", "time_to_start_processing", time.Since(state.timestamp).String()) + return + default: + level.Info(logger).Log("msg", "gRPC request context is not done and processing hasn't started, will wait until context is done or processing starts") + + select { + case <-ctx.Done(): + level.Info(logger).Log("msg", "gRPC request context is finally done, assuming the request was cancelled before processing started, will call RPCCallFinished") + case <-state.headersProcessed: + level.Info(logger).Log("msg", "gRPC request processing has finally started, no need to call RPCCallFinished", "time_to_start_processing", time.Since(state.timestamp).String()) + return + } + } + + called := false + state.rpcCallFinishedOnce.Do(func() { + called = true + g.methodLimiter.RPCCallFinished(ctx) + }) + if called { + level.Info(logger).Log("msg", "called RPCCallFinished for gRPC request that never started processing") + } else { + level.Info(logger).Log("msg", "RPCCallFinishes was already called for this gRPC request, no need to call it again") + } + } } func (g *grpcInflightLimitCheck) UnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { @@ -78,18 +148,40 @@ func (g *grpcInflightLimitCheck) StreamServerInterceptor(srv interface{}, ss grp } return err } - func (g *grpcInflightLimitCheck) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { return ctx } func (g *grpcInflightLimitCheck) HandleRPC(ctx context.Context, rpcStats stats.RPCStats) { - // when request ends, and we started "inflight" request tracking for it, finish it. - if _, ok := rpcStats.(*stats.End); !ok { - return + switch rpcStats.(type) { + case *stats.InHeader: + if state, ok := ctx.Value(gprcInflightLimitCheckerStateKey{}).(*gprcInflightLimitCheckerState); ok { + // We're processing this request, stop the timer. + if !state.nonProcessedRequestTimer.Stop() { + level.Warn(state.logger(g.logger)).Log("msg", "gRPC request processing has started, but the non-processing timer already fired, need to signal that we're processing it now") + // The timer has already expired, so the function is either executing or has executed. + // + // This (stats.InHeader) should be called once and only once, but gRPC is known for changing contracts + // and we don't want this to start panicking trying to close the channel multiple times, + // so a sync.Once doesn't hurt here. + state.headersProcessedOnce.Do(func() { close(state.headersProcessed) }) + } + } + + case *stats.End: + if state, ok := ctx.Value(gprcInflightLimitCheckerStateKey{}).(*gprcInflightLimitCheckerState); ok { + // We're done processing the request, but there's a scenario under which we may have already called RPCCallFinished, + // from the goroutine watching the context in TapHandle, so we need to ensure it's called only once. + called := false + state.rpcCallFinishedOnce.Do(func() { + g.methodLimiter.RPCCallFinished(ctx) + called = true + }) + if !called { + level.Warn(state.logger(g.logger)).Log("msg", "RPCCallFinished was already called for this gRPC request before the request actually finished processing") + } + } } - - g.methodLimiter.RPCCallFinished(ctx) } func (g *grpcInflightLimitCheck) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { @@ -100,6 +192,13 @@ func (g *grpcInflightLimitCheck) HandleConn(_ context.Context, _ stats.ConnStats // Not interested. } +func (g *grpcInflightLimitCheck) timeAfterFunc(d time.Duration, f func()) testableTimer { + if g.timeAfterFuncMock != nil { + return g.timeAfterFuncMock(d, f) + } + return testableTimer{timer: time.AfterFunc(d, f)} +} + // This function mimics the check in grpc library, server.go, handleStream method. handleStream method can stop processing early, // without calling stat handler if the method name is invalid. func isMethodNameValid(method string) bool { @@ -109,3 +208,36 @@ func isMethodNameValid(method string) bool { pos := strings.LastIndex(method, "/") return pos >= 0 } + +type gprcInflightLimitCheckerStateKey struct{} + +type gprcInflightLimitCheckerState struct { + fullMethod string + timestamp time.Time + + nonProcessedRequestTimer testableTimer + headersProcessedOnce sync.Once + headersProcessed chan struct{} + + rpcCallFinishedOnce sync.Once +} + +func (state *gprcInflightLimitCheckerState) logger(baseLogger log.Logger) log.Logger { + return log.With(baseLogger, "method", state.fullMethod, "req_timestamp", state.timestamp.Format(time.RFC3339Nano)) +} + +type testableTimer struct { + // timer is what we use in production code. + timer *time.Timer + + // stop is used in tests to mock timer stopping behavior. + stop func() bool +} + +func (t testableTimer) Stop() bool { + if t.timer != nil { + return t.timer.Stop() + } + + return t.stop() +} diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index f8132e11bf8..473ab2a0025 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -471,7 +471,7 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) { var grpcServerLimit *grpcInflightLimitCheck if cfg.GrpcMethodLimiter != nil { - grpcServerLimit = newGrpcInflightLimitCheck(cfg.GrpcMethodLimiter) + grpcServerLimit = newGrpcInflightLimitCheck(cfg.GrpcMethodLimiter, logger) grpcMiddleware = append(grpcMiddleware, grpcServerLimit.UnaryServerInterceptor) grpcStreamMiddleware = append(grpcStreamMiddleware, grpcServerLimit.StreamServerInterceptor) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 7bc11d60540..03cc397b448 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -764,7 +764,7 @@ github.com/grafana/alerting/receivers/wecom/v1 github.com/grafana/alerting/templates github.com/grafana/alerting/templates/gomplate github.com/grafana/alerting/templates/mimir -# github.com/grafana/dskit v0.0.0-20251009074119-2b7008cbb887 +# github.com/grafana/dskit v0.0.0-20251010193112-965b207c61b8 ## explicit; go 1.23.0 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast