fix(loki.process): Make limit stage shutdown cancelable#6215
Conversation
|
Hey thanks for the pr, ideally you would create an issue before writing a pr and please keep that in mind in the future. Yes this is a bug but there is a way easier solution that create an additional optional interface for pipeline stages. Stage interface includes a Cleanup function that is called whenever a pipeline is stopped and we should use that instead. |
|
Thanks. I will open issues first in the future; I appreciate the steering. I considered the Should I reorder shutdown to call Either option shifts the |
|
I took another look and it's not actually safe to use Cleanup, that would break metrics stage because the actions need's to happen after we don't get any more entries. Then I would prefer to have an optional interface for calling |
rateLimiter.Wait in shouldThrottle blocks until the next token, and the Run goroutine's range loop only observes close(in) between iterations. EntryHandler.Stop hangs for the full token interval at low rates. Add an optional Stopper interface, called by Pipeline.Stop before wg.Wait. limitStage cancels its internal context to unblock Wait; Pipeline implements Stopper so matchers' inner pipelines forward.
e94765d to
3bc7aca
Compare
|
Reworked per your |
Pull Request Details
When
loki.processhasstage.limit { drop = false }, the stage'sshouldThrottlecallsrateLimiter.Wait, which blocks until the nexttoken. The Run goroutine only exits its range loop between iterations,
so a goroutine inside Wait doesn't observe
close(in). With a lowrate,
EntryHandler.Stophangs for the full token interval.This adds an optional
Stopperinterface (Stop()) called byPipeline.Stopbeforewg.Wait.limitStagecancels its internalcontext to unblock Wait.
Pipelineitself implementsStoppersonested inner pipelines (via
matcherStage) forward recursively.Scope
The by-label code path (
stage.limit { by_label_name = "..." }) isnot affected;
validateLimitConfigrequiresdrop = truewithby_label_name, so that path usesrl.Allow()and never reaches aWait call.
Issue(s) fixed by this Pull Request
None. Goroutine leak triggered only on shutdown with non-trivial rate limits.
Notes to the Reviewer
Repro:
stage.limit { rate = 1, burst = 1, drop = false }, sustainedinput, SIGTERM. Without the fix, alloy hangs.
Regression tests in
limit_test.goandmatch_test.goshare anassertPipelineStopsPromptlyhelper using channel-based synchronization:send one entry, receive it (signaling the first iteration completed),
send a second entry that blocks Wait, assert
EntryHandler.Stopreturns within 2s. The
match_test.gocase exercisesmatcherStage.Stopforwarding through its inner Pipeline.Verified with
go test -race -count=1 ./internal/component/loki/process/...and
make lint.PR Checklist