Skip to content

Conversation

@anirudh83
Copy link

What changes were proposed in this pull request?

Sort the output array in CollectList.eval() and CollectSet.eval() using
PhysicalDataType.ordering(child.dataType) to ensure deterministic element ordering. This
follows the same pattern used by CollectTopK.eval().

The change affects two methods in collect.scala:

  • CollectList.eval(): sorts the buffer array before wrapping in GenericArrayData
  • CollectSet.eval(): sorts the result array before wrapping in GenericArrayData

Golden SQL test result files (group-by.sql.out, scalar-subquery-select.sql.out) are
updated to reflect the now-sorted output.

Why are the changes needed?

When AQE is enabled with spark.sql.objectHashAggregate.sortBased.fallbackThreshold=1,
array_agg(DISTINCT) produces wrong results in correlated subqueries. The root cause:

  1. array_agg(DISTINCT x) resolves to CollectList, with DISTINCT handled by adding
    distinct columns to grouping keys in AggUtils.planAggregateWithOneDistinct.
  2. CollectList.eval() returns elements in buffer insertion order.
  3. The sort-based aggregation fallback in ObjectAggregationIterator merges partial
    buffers, causing non-deterministic element order across runs.
  4. Decorrelation rewrites correlated subqueries into joins (DecorrelateInnerQuery) that
    compare array results for equality.
  5. GenericArrayData.equals()/hashCode() are order-sensitive, so arrays with the same
    elements in different orders are treated as unequal, causing the join to produce zero
    matches instead of the correct result.

Both collect_list and collect_set explicitly document that their output order is
non-deterministic, so sorting does not violate their contract.

Does this PR introduce any user-facing change?

Yes. collect_list and collect_set now return elements in sorted order instead of
insertion order. Both functions already document that their output order is
non-deterministic and should not be relied upon, so this is a bug fix rather than a
behavior change in terms of the documented API contract.

Before: SELECT collect_list(col) FROM VALUES (1), (2), (1) AS tab(col) → [1,2,1]
After: SELECT collect_list(col) FROM VALUES (1), (2), (1) AS tab(col) → [1,1,2]

How was this patch tested?

  • Added a regression test in SubquerySuite that verifies array_agg(DISTINCT) produces
    consistent equality results with AQE enabled and sortBased.fallbackThreshold=1.
  • Ran DataFrameAggregateSuite — all 127 tests pass.
  • Ran group-by.sql and scalar-subquery-select.sql golden SQL tests — all pass with
    updated expected output.

Was this patch authored or co-authored using generative AI tooling?

No

… and sort-based aggregation

Sort the output array in CollectList.eval() and CollectSet.eval() to ensure
deterministic element ordering. This fixes incorrect results when sort-based
aggregation fallback merges partial buffers, producing arrays with the same
elements in different orders that then fail equality comparisons in joins
used for decorrelation.

Both functions document that their output order is non-deterministic, so
sorting does not violate their contract.
@github-actions
Copy link

JIRA Issue Information

=== Bug SPARK-55038 ===
Summary: AQE + sortBased aggregation produces wrong results for array_agg(DISTINCT) in correlated subqueries
Assignee: None
Status: Open
Affected: ["3.5.0","4.0.0"]


This comment was automatically generated by GitHub Actions

@github-actions github-actions bot added the SQL label Jan 28, 2026
…t_list

Update expected test outputs in CliSuite to reflect the new sorted
ordering of collect_list results.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant