forked from dask-contrib/dask-sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsort.py
More file actions
152 lines (133 loc) · 5.01 KB
/
sort.py
File metadata and controls
152 lines (133 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from typing import List
import dask.dataframe as dd
import pandas as pd
from dask.utils import M
from dask_sql.utils import make_pickable_without_dask_sql
try:
import dask_cudf
except ImportError:
dask_cudf = None
def apply_sort(
df: dd.DataFrame,
sort_columns: List[str],
sort_ascending: List[bool],
sort_null_first: List[bool],
) -> dd.DataFrame:
# if we have a single partition, we can sometimes sort with map_partitions
if df.npartitions == 1:
if dask_cudf is not None and isinstance(df, dask_cudf.DataFrame):
# cudf only supports null positioning if `ascending` is a single boolean:
# https://github.com/rapidsai/cudf/issues/9400
if (all(sort_ascending) or not any(sort_ascending)) and not any(
sort_null_first[1:]
):
return df.map_partitions(
M.sort_values,
by=sort_columns,
ascending=all(sort_ascending),
na_position="first" if sort_null_first[0] else "last",
)
if not any(sort_null_first):
return df.map_partitions(
M.sort_values, by=sort_columns, ascending=sort_ascending
)
elif not any(sort_null_first[1:]):
return df.map_partitions(
M.sort_values,
by=sort_columns,
ascending=sort_ascending,
na_position="first" if sort_null_first[0] else "last",
)
# dask-cudf only supports ascending sort / nulls last:
# https://github.com/rapidsai/cudf/pull/9250
# https://github.com/rapidsai/cudf/pull/9264
if (
dask_cudf is not None
and isinstance(df, dask_cudf.DataFrame)
and all(sort_ascending)
and not any(sort_null_first)
):
try:
return df.sort_values(sort_columns, ignore_index=True)
except ValueError:
pass
# Split the first column. We need to handle this one with set_index
first_sort_column = sort_columns[0]
first_sort_ascending = sort_ascending[0]
first_null_first = sort_null_first[0]
# Only sort by first column first
# As sorting is rather expensive, we bether persist here
df = df.persist()
df = _sort_first_column(
df, first_sort_column, first_sort_ascending, first_null_first
)
# sort the remaining columns if given
if len(sort_columns) > 1:
df = df.persist()
df = df.map_partitions(
make_pickable_without_dask_sql(sort_partition_func),
meta=df,
sort_columns=sort_columns,
sort_ascending=sort_ascending,
sort_null_first=sort_null_first,
)
return df.persist()
def sort_partition_func(
partition: pd.DataFrame,
sort_columns: List[str],
sort_ascending: List[bool],
sort_null_first: List[bool],
):
if partition.empty:
return partition
# Trick: https://github.com/pandas-dev/pandas/issues/17111
# to make sorting faster
# With that, we can also allow for different NaN-orders by column
# For this, we start with the last sort column
# and use mergesort when we move to the front
for col, asc, null_first in reversed(
list(zip(sort_columns, sort_ascending, sort_null_first))
):
if null_first:
na_position = "first"
else:
na_position = "last"
partition = partition.sort_values(
by=[col], ascending=asc, na_position=na_position, kind="mergesort"
)
return partition
def _sort_first_column(df, first_sort_column, first_sort_ascending, first_null_first):
# Dask can only sort if there are no NaNs in the first column.
# Therefore we need to do a single pass over the dataframe
# to check if we have NaNs in the first column
# If this is the case, we concat the NaN values to the front
# That might be a very complex operation and should
# in general be avoided
col = df[first_sort_column]
is_na = col.isna().persist()
if is_na.any().compute():
df_is_na = df[is_na].reset_index(drop=True).repartition(1)
df_not_is_na = (
df[~is_na].set_index(first_sort_column, drop=False).reset_index(drop=True)
)
else:
df_is_na = None
df_not_is_na = df.set_index(first_sort_column, drop=False).reset_index(
drop=True
)
if not first_sort_ascending:
# As set_index().reset_index() always sorts ascending, we need to reverse
# the order inside all partitions and the order of the partitions itself
# We do not need to do this for the nan-partitions
df_not_is_na = df_not_is_na.map_partitions(
lambda partition: partition[::-1], meta=df
)
df_not_is_na = df_not_is_na.partitions[::-1]
if df_is_na is not None:
if first_null_first:
df = dd.concat([df_is_na, df_not_is_na])
else:
df = dd.concat([df_not_is_na, df_is_na])
else:
df = df_not_is_na
return df