feat(logmq): parallelize alert evaluation and operational event delivery#987
Open
alexluong wants to merge 6 commits into
Open
feat(logmq): parallelize alert evaluation and operational event delivery#987alexluong wants to merge 6 commits into
alexluong wants to merge 6 commits into
Conversation
…vior Pin TODAY's serial post-persist pipeline (alert eval + opevent emit) so the Model C rearchitecture flips a couple of expectations instead of rewriting the tests. Wires the real BatchProcessor + AlertMonitor + Emitter + memlogstore + miniredis; doubles only at the external boundary (recordingSink, recordingDisabler, countingMessage, failingLogStore). Assertions touch three observable oracles only — sink records, per-message ack/nack counters, and ListAttempt — and order is always per-destination, never global, so the parallel refactor can pass unchanged. Split by concern, all files prefixed characterization_: - harness: shared setup, doubles, helpers; harnessConfig nested batcher/alert/doubles - ordering: thresholds/disable, success-resets-count (keystone), interleaved destinations, attempt-order, cross-batch count - idempotency: replay same attempt, exhausted retries - acknowledgement: mixed-batch exactly-once, below-threshold no-alert - validation: in-batch duplicate, insert-error nacks all Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Add Processed/MarkProcessed alongside Exec: a split-phase gate for callers that check before doing the work and mark only after it fully lands (no in-flight conflict detection — concurrent duplicates both run, tolerated by at-least-once consumers). Also use errors.Is for the redis.Nil comparison in the new check path. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
…tor topics Emit now takes an opevents.Event value (topic, tenant, data) instead of three positional args, and payloads.go becomes the wire-contract catalog: the alert payload shapes plus typed constructors, and apirouter's tenant.subscription.updated payload moves there behind one too, replacing the raw topic string + local struct. The alert monitor's consumer interface and call sites are adapted mechanically. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
dd6b484 to
4940928
Compare
… effects The alert package shrinks to a tracker: Monitor and Notifier are deleted; Evaluator evaluates one attempt against the destination's failure history and returns the verdict as data (Evaluation). NewEvaluator takes an AlertStore; deployment scoping lives in NewRedisAlertStore. The logmq batch loop now owns acting on the verdict, per entry and still serial: auto-disable, operator-event construction and delivery, and replay dedup. The per-attempt replay gate (split-phase idempotence) replaces the bespoke cfeval "evaluated" set — a replay of a fully processed failed attempt is skipped instead of re-counting or re-alerting, and the gate survives a success reset. The exhausted-retries suppression window moves behind the SuppressionWindow interface, wired per event at plan time. Processing order and observable behavior are unchanged; the characterization suite's ordered assertions run as-is. New tests pin the gate semantics (stale replay after reset) and the suppression window. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
… entry After the batched insert, each entry's pipeline now runs on its own goroutine — dispatch-and-move-on — and an attempt's events are sent concurrently under a 5s emit timeout. A slow eval or sink send no longer blocks persisting the next batch, and one destination's slow sink no longer serializes everyone else's sends. Entries process in no particular order, including within a destination (discussed on #980): the consecutive-failure counter is a set of attempt IDs, so counting is idempotent and commutative, and per-process ordering was cosmetic anyway with multiple logmq replicas. No pools, queues, or sizing knobs: in-flight work is naturally bounded by arrival rate × the emit timeout, and Shutdown drains the in-flight goroutines (bounded by the same timeout). Characterization tests updated for the ordering relaxation: assertions pin topic multisets and disable calls, never arrival order or attempt identity; tests that need a deterministic sequence (success-reset keystone, suppression window) pace messages one at a time. New tests pin the decoupling (slow send/eval doesn't block persistence), same-destination independence, and shutdown draining. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
4940928 to
7ead66e
Compare
Three paths that must all end in nack-with-gate-unmarked so redelivery re-runs the attempt in full: - emit timeout: a sink send slower than the per-send deadline fails the delivery. BatchProcessorConfig gains a test-only EmitTimeout override (zero = the 5s production default) and the harness sink honors ctx like a real sink call, so the test trips a 50ms deadline instead of sleeping 5s. - MarkProcessed failure: sends succeed, marking the replay gate fails — the delivered event may go out again on redelivery (tolerated duplicate). - partial multi-event failure: one of an attempt's two concurrent sends fails; the whole attempt nacks, duplicates over losses. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Decouples alert processing from log persistence in logmq. Previously the batch loop ran eval, disable, and event emission inline and serially, so one slow Redis call or webhook send stalled persistence of every subsequent batch. Now the loop persists the batch, spawns one goroutine per entry, and moves on; each goroutine evaluates, disables at threshold, sends its operator events concurrently (5s timeout each), marks the replay gate, and acks — any failure nacks for redelivery. Closes #980. There are no pools, queues, or sizing knobs: in-flight work is naturally bounded at arrival rate × ~5s, and a semaphore cap is a small retrofit if it's ever needed.
Two behavior changes to know about: ordering within a destination is relaxed — failure counts stay exact (the counter is a set of attempt IDs), but which attempt's payload carries an alert is nondeterministic under concurrency; and the 5s per-send emit timeout is new — a send slower than that now nacks and redelivers instead of blocking indefinitely.