forked from dask-contrib/dask-sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatacontainer.py
More file actions
190 lines (160 loc) · 6.29 KB
/
datacontainer.py
File metadata and controls
190 lines (160 loc) · 6.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from typing import List, Dict, Tuple, Union
import dask.dataframe as dd
ColumnType = Union[str, int]
class ColumnContainer:
# Forward declaration
pass
class ColumnContainer:
"""
Helper class to store a list of columns,
which do not necessarily be the ones of the dask dataframe.
Instead, the container also stores a mapping from "frontend"
columns (columns with the names and order expected by SQL)
to "backend" columns (the real column names used by dask)
to prevent unnecessary renames.
"""
def __init__(
self,
frontend_columns: List[str],
frontend_backend_mapping: Union[Dict[str, ColumnType], None] = None,
):
assert all(
isinstance(col, str) for col in frontend_columns
), "All frontend columns need to be of string type"
self._frontend_columns = list(frontend_columns)
if frontend_backend_mapping is None:
self._frontend_backend_mapping = {
col: col for col in self._frontend_columns
}
else:
self._frontend_backend_mapping = frontend_backend_mapping
def _copy(self) -> ColumnContainer:
"""
Internal function to copy this container
"""
return ColumnContainer(
self._frontend_columns.copy(),
self._frontend_backend_mapping.copy(),
)
def limit_to(self, fields: List[str]) -> ColumnContainer:
"""
Create a new ColumnContainer, which has frontend columns
limited to only the ones given as parameter.
Also uses the order of these as the new column order.
"""
if not fields:
return self # pragma: no cover
assert all(f in self._frontend_backend_mapping for f in fields)
cc = self._copy()
cc._frontend_columns = [str(x) for x in fields]
return cc
def rename(self, columns: Dict[str, str]) -> ColumnContainer:
"""
Return a new ColumnContainer where the frontend columns
are renamed according to the given mapping.
Columns not present in the mapping are not touched,
the order is preserved.
"""
cc = self._copy()
for column_from, column_to in columns.items():
backend_column = self._frontend_backend_mapping[str(column_from)]
cc._frontend_backend_mapping[str(column_to)] = backend_column
cc._frontend_columns = [
str(columns[col]) if col in columns else col
for col in self._frontend_columns
]
return cc
def mapping(self) -> List[Tuple[str, ColumnType]]:
"""
The mapping from frontend columns to backend columns.
"""
return list(self._frontend_backend_mapping.items())
@property
def columns(self) -> List[str]:
"""
The stored frontend columns in the correct order
"""
return self._frontend_columns.copy()
def add(
self, frontend_column: str, backend_column: Union[str, None] = None
) -> ColumnContainer:
"""
Return a new ColumnContainer with the
given column added.
The column is added at the last position in the column list.
"""
cc = self._copy()
frontend_column = str(frontend_column)
cc._frontend_backend_mapping[frontend_column] = str(
backend_column or frontend_column
)
if frontend_column not in cc._frontend_columns:
cc._frontend_columns.append(frontend_column)
return cc
def get_backend_by_frontend_index(self, index: int) -> str:
"""
Get back the dask column, which is referenced by the
frontend (SQL) column with the given index.
"""
frontend_column = self._frontend_columns[index]
backend_column = self._frontend_backend_mapping[frontend_column]
return backend_column
def get_backend_by_frontend_name(self, column: str) -> str:
"""
Get back the dask column, which is referenced by the
frontend (SQL) column with the given name.
"""
backend_column = self._frontend_backend_mapping[column]
return backend_column
def make_unique(self, prefix="col"):
"""
Make sure we have unique column names by calling each column
<prefix>_<number>
where <number> is the column index.
"""
return self.rename(
columns={
str(col): f"{prefix}_{i}" for i, col in enumerate(self.columns)
}
)
class DataContainer:
"""
In SQL, every column operation or reference is done via
the column index. Some dask operations, such as grouping,
joining or concatenating preserve the columns in a different
order than SQL would expect.
However, we do not want to change the column data itself
all the time (because this would lead to computational overhead),
but still would like to keep the columns accessible by name and index.
For this, we add an additional `ColumnContainer` to each dataframe,
which does all the column mapping between "frontend"
(what SQL expects, also in the correct order)
and "backend" (what dask has).
"""
def __init__(self, df: dd.DataFrame, column_container: ColumnContainer):
self.df = df
self.column_container = column_container
def assign(self) -> dd.DataFrame:
"""
Combine the column mapping with the actual data and return
a dataframe which has the the columns specified in the
stored ColumnContainer.
"""
# We rename as many cols as possible because renaming is much more
# efficient than assigning.
renames = {}
assigns = {}
for col_from, col_to in self.column_container.mapping():
if col_from in self.column_container.columns:
if (
len(renames) < len(self.df.columns)
and col_to not in renames
and (col_from not in self.df.columns or col_from == col_to)
):
renames[col_to] = col_from
else:
assigns[col_from] = self.df[col_to]
df = self.df.rename(columns=renames)
if len(assigns) > 0:
df = df.assign(**assigns)
return df[self.column_container.columns]