Skip to content

Conversation

@charlesbluca
Copy link
Collaborator

Now that Dask's sort_values has support for ascending/descending and null positioning, we can use it to do what _sort_first_column was once doing in a (hopefully) more performant way. There are still some failures here that are resolved with dask/dask#8225, so this should be safe to merge by the next Dask release.

@charlesbluca charlesbluca changed the title Use dask's sort_values for first column sorting in apply_sort Use Dask's sort_values for first column sorting in apply_sort Oct 13, 2021
@charlesbluca
Copy link
Collaborator Author

Thinking about this more, I don't actually think we need to sort the first column of the dataframe - just accomplishing rearrange_by_divisions for the first column should be sufficient so that the map_partitions call sorts the dataframe properly.

Wondering in that case if it makes sense to try and move this logic upstream - maybe with a kwarg for sort_values like sort_function that defaults to M.sort_values but can be replaced with custom functions like sort_partition_func to do sorting with enhanced functionality. One issue that I foresee there is that this would work with a multi-column divisions, as I don't think that would respect multiple ascending/na_position values? But maybe we could restrict to only using single-column divisions if a custom sorting function is passed?

cc @rjzamora if you have any thoughts on this

@rjzamora
Copy link
Contributor

cc @rjzamora if you have any thoughts on this

I agree that much of the dask_cudf sort_values logic should live in upstream dask.dataframe. However, I will need to think a bit more about the sort_function idea to provide any useful feedback.

Somewhat-related thoughts: One thing I like about the dask_cudf logic is that sort_values uses rearrange_by_column rather than rearrange_by_divisions. This makes the mutli-column sorting code path a bit simpler, because we can use multiple columns to define the new (temporary) "_partitions" column, but then the rest of the logic is the same as a single-column sort. I also like that dask_cudf's set_index is directly based on sort_values (which is not the case in dask.dataframe).

@codecov-commenter
Copy link

codecov-commenter commented Nov 1, 2021

Codecov Report

Merging #255 (eb37f15) into main (ebdf4d5) will decrease coverage by 0.00%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##             main     #255      +/-   ##
==========================================
- Coverage   95.89%   95.89%   -0.01%     
==========================================
  Files          64       65       +1     
  Lines        2730     2800      +70     
  Branches      408      418      +10     
==========================================
+ Hits         2618     2685      +67     
- Misses         72       73       +1     
- Partials       40       42       +2     
Impacted Files Coverage Δ
dask_sql/physical/utils/sort.py 83.33% <100.00%> (+0.57%) ⬆️
dask_sql/datacontainer.py 94.31% <0.00%> (-5.69%) ⬇️
dask_sql/cmd.py 100.00% <0.00%> (ø)
dask_sql/physical/rel/convert.py 87.50% <0.00%> (ø)
dask_sql/physical/utils/groupby.py 100.00% <0.00%> (ø)
dask_sql/physical/rel/logical/window.py 98.81% <0.00%> (ø)
dask_sql/physical/rel/custom/__init__.py 100.00% <0.00%> (ø)
dask_sql/physical/rel/custom/distributeby.py 86.36% <0.00%> (ø)
dask_sql/context.py 99.09% <0.00%> (+0.01%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update ebdf4d5...eb37f15. Read the comment docs.

@charlesbluca
Copy link
Collaborator Author

charlesbluca commented Nov 4, 2021

Looks like the original fix I had in mind for dask/dask#8255 is actually needed here if we intend to use upstream Dask sorting functions (rearrange_by_divisions, set_partitions_pre) directly on a dask-cudf dataframe. However, I'm more interested in the sort_function approach described above - will explore that.

In any case, I'm pretty sure using sort_values instead of _sort_first_column for the initial sort is still beneficial performance-wise, so we can merge this now and make a follow up depending on what direction we decide to move in.

ascending=sort_ascending,
na_position="first" if sort_null_first[0] else "last",
)
).persist()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that I added this persist call after sorting to match up with the other sort paths, which persist after sorting.

):
try:
return df.sort_values(sort_columns, ignore_index=True)
return df.sort_values(sort_columns, ignore_index=True).persist()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

@charlesbluca charlesbluca marked this pull request as ready for review November 4, 2021 14:46
@charlesbluca charlesbluca merged commit 5b8f8a9 into dask-contrib:main Nov 4, 2021
@charlesbluca charlesbluca deleted the dask-sort-values branch January 19, 2022 21:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants