-
-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy path_backends.py
More file actions
126 lines (82 loc) · 3.02 KB
/
_backends.py
File metadata and controls
126 lines (82 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from __future__ import annotations
import numpy as np
import pandas as pd
from dask.backends import CreationDispatch
from dask.dataframe.backends import DataFrameBackendEntrypoint
from dask_expr._dispatch import get_collection_type
try:
import sparse
sparse_installed = True
except ImportError:
sparse_installed = False
try:
import scipy.sparse as sp
scipy_installed = True
except ImportError:
scipy_installed = False
dataframe_creation_dispatch = CreationDispatch(
module_name="dataframe",
default="pandas",
entrypoint_root="dask_expr",
entrypoint_class=DataFrameBackendEntrypoint,
name="dataframe_creation_dispatch",
)
class PandasBackendEntrypoint(DataFrameBackendEntrypoint):
"""Pandas-Backend Entrypoint Class for Dask-Expressions
Note that all DataFrame-creation functions are defined
and registered 'in-place'.
"""
@classmethod
def to_backend_dispatch(cls):
from dask.dataframe.dispatch import to_pandas_dispatch
return to_pandas_dispatch
@classmethod
def to_backend(cls, data, **kwargs):
if isinstance(data._meta, (pd.DataFrame, pd.Series, pd.Index)):
# Already a pandas-backed collection
return data
return data.map_partitions(cls.to_backend_dispatch(), **kwargs)
dataframe_creation_dispatch.register_backend("pandas", PandasBackendEntrypoint())
@get_collection_type.register(pd.Series)
def get_collection_type_series(_):
from dask_expr._collection import Series
return Series
@get_collection_type.register(pd.DataFrame)
def get_collection_type_dataframe(_):
from dask_expr._collection import DataFrame
return DataFrame
@get_collection_type.register(pd.Index)
def get_collection_type_index(_):
from dask_expr._collection import Index
return Index
def create_array_collection(expr):
# This is hacky and an abstraction leak, but utilizing get_collection_type
# to infer that we want to create an array is the only way that is guaranteed
# to be a general solution.
# We can get rid of this when we have an Array expression
from dask.dataframe.core import new_dd_object
result = expr.optimize()
return new_dd_object(
result.__dask_graph__(), result._name, result._meta, result.divisions
)
@get_collection_type.register(np.ndarray)
def get_collection_type_array(_):
return create_array_collection
if sparse_installed:
@get_collection_type.register(sparse.COO)
def get_collection_type_array(_):
return create_array_collection
if scipy_installed:
@get_collection_type.register(sp.csr_matrix)
def get_collection_type_array(_):
return create_array_collection
@get_collection_type.register(object)
def get_collection_type_object(_):
from dask_expr._collection import Scalar
return Scalar
######################################
# cuDF: Pandas Dataframes on the GPU #
######################################
@get_collection_type.register_lazy("cudf")
def _register_cudf():
import dask_cudf # noqa: F401