feat(reconciler): persistent worker pools + bulk operations for decoupled ingestion processing#526
Open
mfiedorowicz wants to merge 23 commits into
Open
feat(reconciler): persistent worker pools + bulk operations for decoupled ingestion processing#526mfiedorowicz wants to merge 23 commits into
mfiedorowicz wants to merge 23 commits into
Conversation
Strip handleStreamMessage to decode → CreateIngestionLogs → XACK → XDEL. Remove channel/worker/allDone machinery that spawned unbounded goroutine pools per message. Consume loop now returns immediately after Postgres bulk insert, draining Redis at ~17K entities/sec (vs ~62/sec before). Changeset generation and application will move to a separate inbox processor that polls ingestion_logs independently.
Polls ingestion_logs for QUEUED rows using FOR UPDATE SKIP LOCKED, dispatches to a persistent worker pool via buffered channel. Workers call GenerateChangeSet and ApplyChangeSet at their own pace, decoupled from the Redis consume loop. Also fixes MaxIdleConnsPerHost (was Go default of 2) to match MaxIdleConns for HTTP connection reuse.
… processor Adds BulkPlan HTTP client for the plugin's /bulk-plan endpoint, replacing N per-entity /generate-diff calls with one batched call per claimed batch. The ingestion log processor now calls BulkGenerateChangeSets which builds the bulk request, processes per-entity results, and persists change sets individually. Extracted handleGenerateChangeSetFailure and persistChangeSet helpers from GenerateChangeSet for reuse by the bulk path. Moved BulkGenerateChangeSetResult to reconciler/ops to avoid import cycle with reconciler/mocks.
Add INGESTION_LOG_PROCESSOR_CONCURRENCY config to run multiple poll workers within the ingestion log processor. Each worker independently claims and processes batches via FOR UPDATE SKIP LOCKED, enabling parallel processing without contention.
- Add BulkPersistChangeSets: single-transaction COPY for change_sets and changes rows with pre-allocated sequence IDs, replacing per-item persistChangeSet calls in BulkGenerateChangeSets - Add BulkUpdateIngestionLogStates: unnest-based batch state update - Add BulkCreateChangeSets and BulkTruncateChangeSets queries - Add partial index on ingestion_logs(id) WHERE state = 1 for ClaimQueuedIngestionLogs, drop redundant entity_hash and request_id indexes - Add stripNoopOnlyChanges to clear noop-only changesets before persist
…ades Default to per-item differ.Diff calls (BULK_OPERATIONS_ENABLED=false) so existing deployments with older NetBox plugin versions continue working after reconciler upgrade. When enabled, uses BulkPlan HTTP endpoint for batched diffing. Both paths share the same bulk-persist DB layer.
Add ClaimOpenIngestionLogs (state OPEN→APPLYING with FOR UPDATE SKIP LOCKED), ResetApplyingIngestionLogs for crash recovery, and loadLatestChangeSets that loads changesets with NOOP changes filtered out at the SQL level. New types: OpenIngestionLog, BulkApplyItem, BulkApplyResult. New config: ApplyProcessorBatchSize, ApplyProcessorConcurrency.
Add NetBox plugin client method for POST /bulk-apply/ endpoint and BulkApplyChangeSets applier that sends multiple changesets in a single HTTP call. Refactor toApplyRequest to share serialization between singular and bulk paths.
ApplyProcessor polls OPEN ingestion logs (claimed via state 8), loads their changesets from the DB, and applies them to NetBox via BulkApplyChangeSets. Runs as a separate component alongside the plan processor with configurable concurrency. Also adds BulkApplyChangeSets to Ops and the IngestionProcessorOps interface, and registers the apply processor in cmd/reconciler/main.go with crash recovery (ResetApplyingIngestionLogs on startup).
Remove ApplyChangeSet calls and AutoApplyChangesets branching from the plan processor — applying is now handled by the separate ApplyProcessor. Remove obsolete tests (ApplyChangeSetError, SkipsApplyWhenDisabled) and clean up ProcessesItems test expectations.
Add State_APPLYING = 8 used by the ApplyProcessor to claim OPEN rows via FOR UPDATE SKIP LOCKED before applying them to NetBox.
Both IngestionLogProcessor.pollWorker and ApplyProcessor.pollWorker have return signature error, but every path returns nil — errors are logged and the loop continues. The single-worker dispatch in pollLoop returned the always-nil error; the multi-worker dispatch silently discarded it with _ = p.pollWorker(ctx). The asymmetry was misleading. Drop the error return entirely. Start/pollLoop still return error for the Component interface, but the internal worker no longer pretends to have a failure path it doesn't use.
The IngestionProcessor.GenerateChangeSet/ApplyChangeSet worker pool methods are no longer wired into handleStreamMessage after the consume-loop decoupling. With them gone, GENERATE_CHANGESET_CONCURRENCY and APPLY_CHANGESET_CONCURRENCY no longer control anything — remove them along with IngestionLogToProcess and the test that exercised the dead worker pool. The IngestionProcessorOps.GenerateChangeSet/ApplyChangeSet interface methods stay because the gRPC reconciler service in diode-pro still calls them for the singular RPC handlers.
Adds the Go-side client for the new diode-netbox-plugin endpoint that combines plan + apply in a single HTTP round-trip. The endpoint returns a per-entity result with the generated change_set (always populated when plan succeeded) plus split plan/apply error fields, so the reconciler can persist the change_set for audit/retry regardless of apply outcome. HTTP 200 or 207 are both successful HTTP responses here — the caller inspects per-result Errors to attribute plan vs apply failures.
…an-apply
Replaces the two-processor design (IngestionLogProcessor + ApplyProcessor)
where both consumed NetBox capacity in parallel and competed for the
shared rate limiter, with two mutually-exclusive processors per tenant
routed by AUTO_APPLY_CHANGESETS:
AUTO_APPLY_CHANGESETS=false -> IngestionLogProcessor only
(plan-only, leaves rows in OPEN for
manual review)
AUTO_APPLY_CHANGESETS=true -> AutoApplyProcessor only
(combined plan + apply in one
/bulk-plan-apply HTTP call; rows
go QUEUED -> APPLYING -> APPLIED)
Eliminates the plan-vs-apply contention on the rate limiter: only one
processor consumes NetBox capacity per tenant. Halves HTTP overhead per
ingested entity (one round-trip instead of two) and avoids the
NetBox-side double-validation of the split flow.
Schema changes:
- New SQL ClaimQueuedForAutoApply transitions QUEUED (1) -> APPLYING
(8) atomically under FOR UPDATE SKIP LOCKED.
- ResetApplyingIngestionLogs now resets APPLYING -> QUEUED (was
APPLYING -> OPEN to feed the removed ApplyProcessor).
- Drops the no-longer-used ClaimOpenIngestionLogs query and the
idx_ingestion_logs_claim_apply partial index for state=2.
Go changes:
- New AutoApplyProcessor with the same poll-loop / worker-pool shape
as IngestionLogProcessor (configurable AUTO_APPLY_PROCESSOR_*
env vars).
- New Ops.BulkPlanApply orchestrates the call, persists returned
change_sets in bulk, and writes the per-entity terminal state.
- New differ.ConvertBulkPlanApplyResult splits the plan/apply error
fields from the new endpoint's response shape.
- cmd/reconciler/main.go starts exactly one of the two processors
based on AUTO_APPLY_CHANGESETS; ResetApplyingIngestionLogs runs at
startup only on the auto-apply path.
- Removed ApplyProcessor, ops.BulkApplyChangeSets, ops.BulkApplyItem,
ops.BulkApplyResult, ops.OpenIngestionLog, and the dead
loadLatestChangeSets / loadedChangeSet helpers.
…anApply BulkGenerateChangeSets has long persisted a failure-placeholder change_set when generate_changeset raises — so audit, retry, and (in diode-pro) deviation-type association have a change_set row to attach to. The new BulkPlanApply path was missing this: plan failures just flipped the ingestion_log to FAILED without producing a placeholder change_set, which in diode-pro silently skipped the deviation-type association step. Add persistPlanApplyFailurePlaceholder (mirroring handleGenerateChangeSetFailure): on plan error, record the FAILED state with the error message AND insert a FailedDiffChangeSet so the caller's BulkPlanApplyResult has a ChangeSetID populated. Used for both the whole-batch HTTP error path and the per-entity missing-result path. Removes the now-redundant handlePlanApplyBatchFailure helper.
Without backpressure, AutoApplyProcessor immediately starts claiming QUEUED rows during a burst ingest — competing with the Redis consume loop for Postgres connections and CPU while also consuming NetBox capacity that's already the slowest stage. Net effect under load: both stages run slower than if AutoApply yielded to the consume loop first. Add the same BackpressureFunc parameter IngestionLogProcessor already takes. In the poll loop, before claiming a batch, check the function and if it returns true, sleep one idle interval and retry. The diode-server main.go wires it to redisStreamClient.XLen() > INGESTION_LOG_PROCESSOR_BACKPRESSURE_THRESHOLD — same threshold as the plan-only path uses, since the two processors are mutually exclusive and the variable describes "log processing should defer to the consume loop".
CreateIngestionLogs was calling RefreshDefaultBranch on every Redis stream batch, which evicts the 60s LRU cache and forces a fresh NetBox HTTP call. Under burst ingest the consume loop fired tens of default-branch requests per second, saturating NetBox plugin workers and starving /bulk-plan-apply. Switch to the cached DefaultBranch lookup. A short staleness window on a value that changes rarely is acceptable, and matches what other ingestion-log paths already do.
Background goroutine (Ops.Start) owns NetBox HTTP for the default-branch lookup; hot-path callers read from cache only. Consume loop keeps draining Redis when NetBox/hydra is unreachable. Failed refresh retains the last-known value. Cold cache returns (nil, nil) and callers degrade to no-branch-filter lookups.
Doc claimed "subsequent calls are no-ops" but impl launched a new goroutine every call. With current callers this never happens, but defensive against future misuse.
Vulnerability Scan: Passed — diode-reconcilerImage: No vulnerabilities found. Commit: 16a5ec8 |
Vulnerability Scan: Passed — diode-authImage:
Commit: 16a5ec8 |
Vulnerability Scan: Passed — diode-ingesterImage: No vulnerabilities found. Commit: 16a5ec8 |
|
Go test coverage
Total coverage: 50.1% |
jajeffries
approved these changes
May 13, 2026
samiura
approved these changes
May 13, 2026
MicahParks
approved these changes
May 13, 2026
…rker Both AutoApplyProcessor.pollWorker and IngestionLogProcessor.pollWorker opened each loop iteration with a non-blocking select on ctx.Done() to exit on cancellation. The check is defensive but redundant: every operation that follows in the body (backpressure(ctx), claim queries, HTTP calls, the time.After fallback) is itself ctx-aware and will surface a cancelled context within a single tick. Dropping the early selects shortens the hot loop without changing cancellation semantics. Per PR #526 review feedback from @MicahParks.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Rework the reconciler's ingestion path from per-message channel/worker dispatch into a Postgres-backed inbox model with persistent worker pools. The Redis consume loop now drains at Postgres COPY speed and no longer blocks on NetBox HTTP; plan and apply phases run in their own decoupled processors with independent concurrency.
What changes
Consume loop is now thin.
handleStreamMessagedecodes the batch, bulk-COPYs newingestion_logsrows (state=QUEUED), XACK/XDELs, and returns. No worker channels, no NetBox calls, no waiting on apply.Two new long-running processors:
IngestionLogProcessor— polls QUEUED rows viaFOR UPDATE SKIP LOCKED, dispatches to a persistent worker pool. CallsGenerateChangeSet(plan) at its own pace. Configurable concurrency, batch size, and Redis-stream-length backpressure threshold.ApplyProcessor/AutoApplyProcessor— apply phase split out from generation for independent tuning.AutoApplyProcessoruses a new combined/bulk-plan-applyHTTP call whenAUTO_APPLY_CHANGESETS=trueto halve NetBox round-trips per entity.Bulk HTTP clients to the NetBox plugin:
BulkPlan,BulkApply,BulkPlanApply(gated byBULK_OPERATIONS_ENABLED, default false for compatibility with older plugin versions).Bulk DB writes:
BulkCreateIngestionLogs(sqlc:copyfrom) andBulkPersistChangeSetsreplace per-row INSERT loops in the hot paths.Background default-branch refresher owns the NetBox HTTP call that previously sat in the consume hot path. Hot-path callers read a cache; a goroutine refreshes on a fixed interval and retains the last-known value on transient failure, so consume keeps draining if NetBox/auth is briefly unreachable.
Ops.Start(ctx)is idempotent viasync.Once.Proto / state machine: new
APPLYINGstate for the apply-in-flight window.Cleanup: drops the dead
GenerateChangeSetConcurrency/ApplyChangeSetConcurrencyconfig (left over from the removed in-line worker dispatch).Notes for review
BULK_OPERATIONS_ENABLED=falseandAUTO_APPLY_CHANGESETS=false(the defaults).ingestion_log_processor_test.go, expandedops_test.go. Existingingestion_processortests trimmed to reflect the slimmer consume loop.Test plan
make lintcleanmake testpasses (race detector on)BULK_OPERATIONS_ENABLED=trueand the legacy path