From 3ecd710d1b504b833edc5f05c5487cc2258c41d4 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 13 Oct 2021 07:46:15 -0700 Subject: [PATCH 1/4] Use dask's sort_values for first column sort --- dask_sql/physical/utils/sort.py | 56 +++++---------------------------- 1 file changed, 7 insertions(+), 49 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 5857a3321..c6d760c7b 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -58,16 +58,13 @@ def apply_sort( except ValueError: pass - # Split the first column. We need to handle this one with set_index - first_sort_column = sort_columns[0] - first_sort_ascending = sort_ascending[0] - first_null_first = sort_null_first[0] - - # Only sort by first column first - # As sorting is rather expensive, we bether persist here - df = df.persist() - df = _sort_first_column( - df, first_sort_column, first_sort_ascending, first_null_first + # Dask doesn't natively support multi-column sorting; + # we work around this by initially sorting by the first + # column then handling the rest with `map_partitions` + df = df.sort_values( + by=sort_columns[0], + ascending=sort_ascending[0], + na_position="first" if sort_null_first[0] else "last", ) # sort the remaining columns if given @@ -111,42 +108,3 @@ def sort_partition_func( ) return partition - - -def _sort_first_column(df, first_sort_column, first_sort_ascending, first_null_first): - # Dask can only sort if there are no NaNs in the first column. - # Therefore we need to do a single pass over the dataframe - # to check if we have NaNs in the first column - # If this is the case, we concat the NaN values to the front - # That might be a very complex operation and should - # in general be avoided - col = df[first_sort_column] - is_na = col.isna().persist() - if is_na.any().compute(): - df_is_na = df[is_na].reset_index(drop=True).repartition(1) - df_not_is_na = ( - df[~is_na].set_index(first_sort_column, drop=False).reset_index(drop=True) - ) - else: - df_is_na = None - df_not_is_na = df.set_index(first_sort_column, drop=False).reset_index( - drop=True - ) - if not first_sort_ascending: - # As set_index().reset_index() always sorts ascending, we need to reverse - # the order inside all partitions and the order of the partitions itself - # We do not need to do this for the nan-partitions - df_not_is_na = df_not_is_na.map_partitions( - lambda partition: partition[::-1], meta=df - ) - df_not_is_na = df_not_is_na.partitions[::-1] - - if df_is_na is not None: - if first_null_first: - df = dd.concat([df_is_na, df_not_is_na]) - else: - df = dd.concat([df_not_is_na, df_is_na]) - else: - df = df_not_is_na - - return df From b4df399c881a3c32673f1a27665c723199115cd0 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 1 Nov 2021 08:32:02 -0700 Subject: [PATCH 2/4] Trigger CI From eb37f15e212577a93e1bea52dc68d9800627e943 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 4 Nov 2021 07:20:25 -0700 Subject: [PATCH 3/4] Make sure to persist after sorting in the GPU case --- dask_sql/physical/utils/sort.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index c6d760c7b..c2a4237bb 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -54,7 +54,7 @@ def apply_sort( and not any(sort_null_first) ): try: - return df.sort_values(sort_columns, ignore_index=True) + return df.sort_values(sort_columns, ignore_index=True).persist() except ValueError: pass @@ -65,20 +65,19 @@ def apply_sort( by=sort_columns[0], ascending=sort_ascending[0], na_position="first" if sort_null_first[0] else "last", - ) + ).persist() # sort the remaining columns if given if len(sort_columns) > 1: - df = df.persist() df = df.map_partitions( make_pickable_without_dask_sql(sort_partition_func), meta=df, sort_columns=sort_columns, sort_ascending=sort_ascending, sort_null_first=sort_null_first, - ) + ).persist() - return df.persist() + return df def sort_partition_func( From e4850ab6703f4acbbcd0bdc357ec02503760a637 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Thu, 4 Nov 2021 07:45:17 -0700 Subject: [PATCH 4/4] Add another missing persist call --- dask_sql/physical/utils/sort.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/physical/utils/sort.py b/dask_sql/physical/utils/sort.py index 4248ca2e1..46b72b6c8 100644 --- a/dask_sql/physical/utils/sort.py +++ b/dask_sql/physical/utils/sort.py @@ -25,7 +25,7 @@ def apply_sort( by=sort_columns, ascending=sort_ascending, na_position="first" if sort_null_first[0] else "last", - ) + ).persist() # dask-cudf only supports ascending sort / nulls last: # https://github.com/rapidsai/cudf/pull/9250