Skip to content

Conversation

@brandon-b-miller
Copy link
Contributor

Row UDFs in dask-sql are analogous to the functions expected by pandas.DataFrame.apply an expect a row of data containing all the scalars in a single row. The UDF accesses these scalars from the row via the corresponding column labels within the function:

def f(row):
    x = row['a'] * 2
    y = row['b']
    return x - y

This works fine if the dataframe that calls the apply from dask in the end actually contains columns named a and b. This is however likely only the case for simple queries and fails if any columns are aliased during more complex operations, such as joins.

This PR proposes to solve the problem by retaining the names of the original variables the function was registered with and reapplying them to the data later, just before calling apply.

@codecov-commenter
Copy link

codecov-commenter commented Feb 28, 2022

Codecov Report

Merging #409 (f2ce42f) into main (ef0fa16) will increase coverage by 0.21%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #409      +/-   ##
==========================================
+ Coverage   88.94%   89.15%   +0.21%     
==========================================
  Files          68       68              
  Lines        3337     3338       +1     
  Branches      657      658       +1     
==========================================
+ Hits         2968     2976       +8     
+ Misses        297      287      -10     
- Partials       72       75       +3     
Impacted Files Coverage Δ
dask_sql/context.py 100.00% <100.00%> (ø)
dask_sql/datacontainer.py 95.61% <100.00%> (+1.80%) ⬆️
dask_sql/_version.py 34.00% <0.00%> (+1.44%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ef0fa16...f2ce42f. Read the comment docs.

Copy link
Collaborator

@charlesbluca charlesbluca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 🙂 just a couple suggestions:

Comment on lines 218 to 219
df = column_args[0].to_frame()
for col in column_args[1:]:
df[col.name] = col
for name, col in zip(self.names, column_args):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we pass the first parameter column name to to_frame, we lose a layer off the resulting HLG and don't have to deal with a superfluous column:

            df = column_args[0].to_frame(self.names[0])
            for name, col in zip(self.names[1:], column_args[1:]):

Copy link
Collaborator

@charlesbluca charlesbluca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some suggestions for my first review:

Copy link
Collaborator

@charlesbluca charlesbluca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

As we discussed internally, it would probably be nice to follow this PR up with some work to make function registration more robust so we don't have to assume argument order - I can open up an issue to track that

@charlesbluca charlesbluca merged commit cd38818 into dask-contrib:main Mar 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants