-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Support Schema Field Metadata in User-Defined Aggregate Functions (UDAFs) #17085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Improve documentation for AccumulatorArgs.schema and exprs: - Add example showing how to retrieve field metadata and return field. - Explain synthesized schema behavior for literal-only inputs. - Clarify precedence when inputs are mixed (physical schema metadata wins; synthesized metadata used only when physical schema is empty). - Update AggregateFunctionExpr::args_schema docs: - Explain field order guarantees, synthesized schema usage, and that std::borrow::Cow is used to avoid allocations when possible. - Add a TODO to factor AccumulatorArgs construction into a private helper. Documentation-only changes; no behavioral changes.
… in AggregateFunctionExpr
fb6c9a8 to
2991acc
Compare
- Updated the `DummyUdf::new` function to accept a `Signature` parameter for initialization. - Modified test cases to reflect the new initialization method for `DummyUdf`. - Improved the `args_schema` method to better manage when to borrow the existing schema or create a new one, ensuring correct correspondence between expressions and schema fields. - Added comments and examples for improved clarity on the schema handling behavior.
- Introduced a `build_acc_args` method to encapsulate the logic for building `AccumulatorArgs` and executing a closure with them. - Updated the `create_accumulator`, `create_sliding_accumulator`, `groups_accumulator_supported`, and `create_groups_accumulator` methods to utilize the new method for better readability and maintainability.
Jefffrey
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I must admit I am quite confused at the fix proposed by this PR; it seems there are cases where for literal only inputs to aggregates, schema is empty and this PR fixes it by having a fallback behaviour in the AggregateFunctionExpr to "synthesize" the schema from the input arguments?
If my understanding is correct, my questions/thoughts are:
- Is this a band-aid fix? Is there a root cause we should be looking for instead?
- There's a heavy emphasis on the word "synthesize" throughout this PR but I don't know what it means to "synthesize" a schema from literal expressions 🤔
datafusion/expr/src/udaf.rs
Outdated
| /// Example: retrieving metadata and return field for input `i`: | ||
| /// ```ignore | ||
| /// let metadata = acc_args.schema.field(i).metadata(); | ||
| /// let field = acc_args.exprs[i].return_field(&acc_args.schema)?; | ||
| /// ``` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm having some trouble understanding this example; I can understand the part for getting the metadata of a field given the context of the PR, but why do we also include an example for getting the return field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The snippet is meant to illustrate the sentence immediately above it: you pair acc_args.exprs with acc_args.schema to recover the full FieldRef for argument i.
Pulling the metadata out of schema.field(i) is one common use case, and the follow-up line shows how you would then obtain the complete FieldRef (name, type, metadata) via:
exprs[i].return_field(&acc_args.schema)...using the same pairing.
I'll tweak the wording.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The snippet is meant to illustrate the sentence immediately above it: you pair
acc_args.exprswithacc_args.schemato recover the fullFieldReffor argument i.
This may be a silly question, but what's the difference between acc_args.exprs[i].return_field(&acc_args.schema) and acc_args.schema.field(i)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not at all 😄
-
acc_args.schema.field(i)— returns the raw ArrowFieldfrom the (physical) input schema at positioni(name, type, nullability, metadata exactly as in that schema). -
acc_args.exprs[i].return_field(&acc_args.schema)?— asks the expression for the effectiveFieldReffor argumentigiven the full schema. It incorporates expression semantics (casts, literals, computed types, extension metadata, nullability changes, etc.) and returns an owned/ArcFieldRef(and can fail), not just a borrowed&Field.
AggregateExprBuilder already captures a FieldRef for every argument (including literals) by calling each physical expression’s return_field during construction, so we retain the full Arrow metadata for those inputs in input_fields. The new args_schema helper detects when the physical input schema is empty—something that legitimately happens when an aggregate is invoked with literals only because the child plan has no columns—and in that case reconstitutes a Schema from the stored input_fields so the accumulator can still see that metadata. We then hand that schema to every AccumulatorArgs we build, so UDAFs observe the same field information whether their inputs were columns or literals. In other words, “synthesize” means “wrap the already-computed argument fields in a temporary Schema when the physical schema is empty”; there isn’t another layer hiding the real root cause. |
1583191 to
07f6fab
Compare
Thank you for the explanations, I'm still trying to wrap my head around all the parts involved here 😅 So I think my main confusion lies around the difference between the physical input schema, and the effective If we look at scalar & window functions, I don't see them having equivalent logic around providing direct access to the physical schema, instead they provide methods to directly access
I'm trying to understand why |
Why
|
| Aspect | Scalar / Window | Accumulator (Before) | Accumulator (After) |
|---|---|---|---|
| Fields | Pre-computed at planning | Computed on-demand | Computed on-demand |
| Schema | Hidden (not needed) | Exposed (confusing) | Exposed (documented) |
| Field access | Direct: args.arg_fields[i] |
Manual: exprs[i].return_field(schema) ? |
Helper: input_field(i) ? |
| Why different? | Simple 1:1 args | Expressions need schema for column resolution | Same (clarified) |
This PR clarifies intent and align ergonomics where possible, while preserving the functionality that makes aggregates work correctly.
|
Sorry this doesn't clear it up for me; the example of |
Expand documentation to explain the relationship between schemas, unevaluated argument expressions, and their differences from scalar and window function argument handling. Address the specific case of SUM(a + b) vs SIN(a + b) for better understanding.
|
Closing this. |
…ields (#18100) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #16997 - Part of #11725 - Supersedes #17085 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> When reviewing #17085 I was very confused by the fix suggested, and tried to understand why `AccumulatorArgs` didn't have easy access to `Field`s of its input expressions, as compared to scalar/window functions which do. Introducing this new field should make it easier for users to grab datatype, metadata, nullability of their input expressions for aggregate functions. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Add a slice of `FieldRef` to `AccumulatorArgs` so users don't need to compute the input expression fields themselves via using schema. This addresses #16997 as it was confusing to have only the schema available as there are valid (?) cases where the schema is empty (such as literal only input). This fix differs from #17085 in that it doesn't special case for when there is literal only input; it leaves the physical `schema` provided to `AccumulatorArgs` untouched but provides a more ergonomic (and less confusing) API for users to retrieve `Field`s of their input arguments. - I'm still not sure if the schema being empty for literal only inputs is correct or not, so this might be considered a side step. If we could remove `schema` entirely from `AccumulatorArgs` maybe we wouldn't need to worry about this, but see my comment for why that wasn't done in this PR ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Existing unit tests. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes, new field to `AccumulatorArgs` which is publicly exposed (with all it's fields). <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
…ields (apache#18100) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#16997 - Part of apache#11725 - Supersedes apache#17085 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> When reviewing apache#17085 I was very confused by the fix suggested, and tried to understand why `AccumulatorArgs` didn't have easy access to `Field`s of its input expressions, as compared to scalar/window functions which do. Introducing this new field should make it easier for users to grab datatype, metadata, nullability of their input expressions for aggregate functions. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Add a slice of `FieldRef` to `AccumulatorArgs` so users don't need to compute the input expression fields themselves via using schema. This addresses apache#16997 as it was confusing to have only the schema available as there are valid (?) cases where the schema is empty (such as literal only input). This fix differs from apache#17085 in that it doesn't special case for when there is literal only input; it leaves the physical `schema` provided to `AccumulatorArgs` untouched but provides a more ergonomic (and less confusing) API for users to retrieve `Field`s of their input arguments. - I'm still not sure if the schema being empty for literal only inputs is correct or not, so this might be considered a side step. If we could remove `schema` entirely from `AccumulatorArgs` maybe we wouldn't need to worry about this, but see my comment for why that wasn't done in this PR ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Existing unit tests. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes, new field to `AccumulatorArgs` which is publicly exposed (with all it's fields). <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
…ields (apache#18100) ## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#16997 - Part of apache#11725 - Supersedes apache#17085 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> When reviewing apache#17085 I was very confused by the fix suggested, and tried to understand why `AccumulatorArgs` didn't have easy access to `Field`s of its input expressions, as compared to scalar/window functions which do. Introducing this new field should make it easier for users to grab datatype, metadata, nullability of their input expressions for aggregate functions. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> Add a slice of `FieldRef` to `AccumulatorArgs` so users don't need to compute the input expression fields themselves via using schema. This addresses apache#16997 as it was confusing to have only the schema available as there are valid (?) cases where the schema is empty (such as literal only input). This fix differs from apache#17085 in that it doesn't special case for when there is literal only input; it leaves the physical `schema` provided to `AccumulatorArgs` untouched but provides a more ergonomic (and less confusing) API for users to retrieve `Field`s of their input arguments. - I'm still not sure if the schema being empty for literal only inputs is correct or not, so this might be considered a side step. If we could remove `schema` entirely from `AccumulatorArgs` maybe we wouldn't need to worry about this, but see my comment for why that wasn't done in this PR ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Existing unit tests. ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes, new field to `AccumulatorArgs` which is publicly exposed (with all it's fields). <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
Which issue does this PR close?
AccumulatorArgs.schemais empty when passing in scalar input #16997Rationale for this change
Previously, aggregate UDFs could not easily access field-level metadata (e.g., Arrow extension types) when invoked with literal arguments only, because the
AccumulatorArgs.schemawas always derived from the physical schema — which is empty for literal-only inputs.This change ensures that in such cases, a schema is synthesized from the literal expressions, preserving metadata and enabling richer accumulator behavior. It also clarifies API documentation for
AccumulatorArgsandAggregateUDFImpl.What changes are included in this PR?
args_schema()helper toAggregateFunctionExprto return either the physical input schema or a synthesized schema from literals when the physical schema is empty.create_accumulator,create_sliding_accumulator,groups_accumulator_supported, andcreate_groups_accumulatorto use the new schema logic viamake_acc_args().AccumulatorArgsandAggregateUDFImpldocumentation to explain how to access input field metadata and when synthesized schemas are used.SchemaBasedAggregateUdftest inuser_defined_aggregates.rsto validate metadata handling in literal-only aggregates.aggregate.rsto verify correct schema behavior for both literal-only and physical-schema-present cases.Are these changes tested?
Yes.
test_schema_based_aggregate_udf_metadataensures that metadata from literals is accessible in the accumulator.aggregate.rsvalidates thatargs_schema()returns an owned schema for literal-only inputs and a borrowed schema for non-empty physical schemas.Are there any user-facing changes?
Yes:
AccumulatorArgs.schemafor literal-only inputs.