-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathsort.py
More file actions
233 lines (196 loc) · 9.1 KB
/
sort.py
File metadata and controls
233 lines (196 loc) · 9.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
from dask_sql.utils import new_temporary_column
from typing import List
import dask
import dask.dataframe as dd
from dask_sql.physical.rex import RexConverter
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.datacontainer import DataContainer
from dask_sql.java import org
class LogicalSortPlugin(BaseRelPlugin):
"""
LogicalSort is used to sort by columns (ORDER BY)
as well as to only get a certain part of the dataframe
(LIMIT).
"""
class_name = "org.apache.calcite.rel.logical.LogicalSort"
def convert(
self, rel: "org.apache.calcite.rel.RelNode", context: "dask_sql.Context"
) -> DataContainer:
(dc,) = self.assert_inputs(rel, 1, context)
df = dc.df
cc = dc.column_container
sort_collation = rel.getCollation().getFieldCollations()
sort_columns = [
cc.get_backend_by_frontend_index(int(x.getFieldIndex()))
for x in sort_collation
]
if sort_columns:
ASCENDING = org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
FIRST = org.apache.calcite.rel.RelFieldCollation.NullDirection.FIRST
sort_ascending = [x.getDirection() == ASCENDING for x in sort_collation]
sort_null_first = [x.nullDirection == FIRST for x in sort_collation]
df = self._apply_sort(df, sort_columns, sort_ascending, sort_null_first)
offset = rel.offset
if offset:
offset = RexConverter.convert(offset, df, context=context)
end = rel.fetch
if end:
end = RexConverter.convert(end, df, context=context)
if offset:
end += offset
if offset is not None or end is not None:
df = self._apply_offset(df, offset, end)
cc = self.fix_column_to_row_type(cc, rel.getRowType())
# No column type has changed, so no need to cast again
return DataContainer(df, cc)
def _apply_sort(
self,
df: dd.DataFrame,
sort_columns: List[str],
sort_ascending: List[bool],
sort_null_first: List[bool],
) -> dd.DataFrame:
# Split the first column. We need to handle this one with set_index
first_sort_column = sort_columns[0]
first_sort_ascending = sort_ascending[0]
first_null_first = sort_null_first[0]
# Only sort by first column first
df = self._sort_first_column(
df, first_sort_column, first_sort_ascending, first_null_first
)
# sort the remaining columns if given
if len(sort_columns) > 1:
df = self._sort_secondary_columns(
df, sort_columns, sort_ascending, sort_null_first
)
return df.persist()
def _sort_secondary_columns(
self, df, sort_columns, sort_ascending, sort_null_first
):
def sort_partition_func(partition):
# We are going to add additional columns here
# That is not something we would like to do on the
# original data. I hope that this does not have
# a huge performance impact
partition = partition.copy()
# pandas does not allow to sort by NaN first/last
# differently for different columns. Therefore
# we split again by NaN
original_columns = partition.columns
tmp_columns = []
tmp_ascending = []
for col, asc, null_first in zip(
sort_columns, sort_ascending, sort_null_first
):
is_null_col = new_temporary_column(partition)
partition[is_null_col] = partition[col].isna()
tmp_columns += [is_null_col]
if null_first:
tmp_ascending += [False]
else:
tmp_ascending += [True]
filled_nan_col = new_temporary_column(partition)
# the actual value does not matter
partition[filled_nan_col] = partition[col].fillna(0)
tmp_columns += [filled_nan_col]
tmp_ascending += [asc]
partition = partition.sort_values(tmp_columns, ascending=tmp_ascending)
return partition[original_columns]
return df.map_partitions(sort_partition_func, meta=df._meta)
def _sort_first_column(
self, df, first_sort_column, first_sort_ascending, first_null_first
):
# Dask can only sort if there are no NaNs in the first column.
# Therefore we need to do a single pass over the dataframe
# to check if we have NaNs in the first column
# If this is the case, we concat the NaN values to the front
# That might be a very complex operation and should
# in general be avoided
col = df[first_sort_column]
is_na = col.isna().persist()
if is_na.any().compute():
df_is_na = df[is_na].reset_index(drop=True).repartition(1)
df_not_is_na = (
df[~is_na]
.set_index(first_sort_column, drop=False)
.reset_index(drop=True)
)
else:
df_is_na = None
df_not_is_na = df.set_index(first_sort_column, drop=False).reset_index(
drop=True
)
if not first_sort_ascending:
# As set_index().reset_index() always sorts ascending, we need to reverse
# the order inside all partitions and the order of the partitions itself
# We do not need to do this for the nan-partitions
df_not_is_na = df_not_is_na.map_partitions(
lambda partition: partition[::-1], meta=df
)
df_not_is_na = df_not_is_na.partitions[::-1]
if df_is_na is not None:
if first_null_first:
df = dd.concat([df_is_na, df_not_is_na])
else:
df = dd.concat([df_not_is_na, df_is_na])
else:
df = df_not_is_na
return df
def _apply_offset(self, df: dd.DataFrame, offset: int, end: int) -> dd.DataFrame:
"""
Limit the dataframe to the window [offset, end].
That is unfortunately, not so simple as we do not know how many
items we have in each partition. We have therefore no other way than to
calculate (!!!) the sizes of each partition.
After that, we can create a new dataframe from the old
dataframe by calculating for each partition if and how much
it should be used.
We do this via generating our own dask computation graph as
we need to pass the partition number to the selection
function, which is not possible with normal "map_partitions".
"""
if not offset:
# We do a (hopefully) very quick check: if the first partition
# is already enough, we will just use this
first_partition_length = len(df.partitions[0])
if first_partition_length >= end:
return df.head(end, compute=False)
# First, we need to find out which partitions we want to use.
# Therefore we count the total number of entries
partition_borders = df.map_partitions(lambda x: len(x))
# Now we let each of the partitions figure out, how much it needs to return
# using these partition borders
# For this, we generate out own dask computation graph (as it does not really
# fit well with one of the already present methods).
# (a) we define a method to be calculated on each partition
# This method returns the part of the partition, which falls between [offset, fetch]
# Please note that the dask object "partition_borders", will be turned into
# its pandas representation at this point and we can calculate the cumsum
# (which is not possible on the dask object). Recalculating it should not cost
# us much, as we assume the number of partitions is rather small.
@dask.delayed
def select_from_to(df, partition_index, partition_borders):
partition_borders = partition_borders.cumsum().to_dict()
this_partition_border_left = (
partition_borders[partition_index - 1] if partition_index > 0 else 0
)
this_partition_border_right = partition_borders[partition_index]
if (end and end < this_partition_border_left) or (
offset and offset >= this_partition_border_right
):
return df.iloc[0:0]
from_index = max(offset - this_partition_border_left, 0) if offset else 0
to_index = (
min(end, this_partition_border_right)
if end
else this_partition_border_right
) - this_partition_border_left
return df.iloc[from_index:to_index]
# (b) Now we just need to apply the function on every partition
# We do this via the delayed interface, which seems the easiest one.
return dd.from_delayed(
[
select_from_to(partition, partition_number, partition_borders)
for partition_number, partition in enumerate(df.partitions)
]
)