Make FilterExec cooperative for all-rows-rejected loops#419
Make FilterExec cooperative for all-rows-rejected loops#419mpurins-coralogix wants to merge 3 commits intov50.3from
Conversation
When a `FilterExec` predicate rejects every row in every batch, the inner `loop` in `FilterExecStream::poll_next` would spin without yielding to Tokio: each iteration consumed an input batch, produced an empty filtered batch, hit `continue`, and never exited the loop. On unbounded inputs this monopolised the worker and made the task uncancellable. Insert a cooperative yield point at the top of each loop iteration and account for predicate work after each batch. Three variants gated on `cfg(datafusion_coop)` to mirror `crate::coop::CooperativeStream`: - `tokio` (default): `poll_proceed(cx)` + `made_progress()` per batch. - `tokio_fallback`: `has_budget_remaining()` check + drive `consume_budget()` once with the current cx after each batch. - `per_stream`: local `u8` budget counter on `FilterExecStream`, reset to `FILTER_YIELD_FREQUENCY = 128` (matches Tokio's task budget) after each yield. Mark `FilterExec` as `SchedulingType::Cooperative` so the `EnsureCooperative` optimizer rule does not wrap it. Add a discriminating integration test `filter_reject_all_batches_yields_without_cooperative_input` backed by a self-contained `NonCooperativeInfiniteExec` test fixture and a `query_yields_no_ensure_cooperative` helper that bypasses the optimizer rule. The test fails (10s timeout) without this change and passes with it, isolating the cooperative behavior to FilterExec itself. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
|
Standard questions: does this duplicate functionality that was solved in upstream at some point ahead of 50? If so, what PR, if not can we PR upstream. |
|
Not this is new and has not been solved upstream. I have been following with one eye coop work in datafusion, and unless I missed something then it was never recognised that FilterExec needs fix as well. |
Would you mind filing an issue for it? It would be best if upstream could extend their solution to cover filter exec, and then we can stop re-basing this commit during future upgrades. |
|
also let's not merge this -- it's more of a poc/draft right now |
When a
FilterExecpredicate rejects every row in every batch, the innerloopinFilterExecStream::poll_nextwould spin without yielding to Tokio: each iteration consumed an input batch, produced an empty filtered batch, hitcontinue, and never exited the loop. On unbounded inputs this monopolised the worker and made the task uncancellable.Insert a cooperative yield point at the top of each loop iteration and account for predicate work after each batch. Three variants gated on
cfg(datafusion_coop)to mirrorcrate::coop::CooperativeStream:tokio(default):poll_proceed(cx)+made_progress()per batch.tokio_fallback:has_budget_remaining()check + driveconsume_budget()once with the current cx after each batch.per_stream: localu8budget counter onFilterExecStream, reset toFILTER_YIELD_FREQUENCY = 128(matches Tokio's task budget) after each yield.Mark
FilterExecasSchedulingType::Cooperativeso theEnsureCooperativeoptimizer rule does not wrap it.Add a discriminating integration test
filter_reject_all_batches_yields_without_cooperative_inputbacked by a self-containedNonCooperativeInfiniteExectest fixture and aquery_yields_no_ensure_cooperativehelper that bypasses the optimizer rule. The test fails (10s timeout) without this change and passes with it, isolating the cooperative behavior to FilterExec itself.Which issue does this PR close?
Closes #.
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?