1313 no_default , partial , partial_by_order ,
1414 split_evenly , check_divisions , hash_shard ,
1515 split_out_on_index , Index )
16- from dask .dataframe .groupby import _apply_chunk
17- from dask .dataframe .utils import _nonempty_index , make_meta
16+ from dask .dataframe .utils import _nonempty_index
1817from dask .dataframe .utils import make_meta as dd_make_meta
1918from dask .delayed import Delayed
20- from dask .optimize import cull
19+ from dask .optimization import cull
2120from dask .utils import derived_from
2221from scipy import sparse
2322from toolz import merge , remove , partition_all
@@ -65,9 +64,6 @@ def __init__(self, dsk, name, meta, divisions=None):
6564 def __dask_graph__ (self ):
6665 return self .dask
6766
68- def __dask_keys__ (self ):
69- return self ._keys ()
70-
7167 __dask_scheduler__ = staticmethod (dask .threaded .get )
7268
7369 @staticmethod
@@ -112,7 +108,7 @@ def map_partitions(self, func, meta, *args, **kwargs):
112108 return map_partitions (func , self , meta , * args , ** kwargs )
113109
114110 def to_delayed (self ):
115- return [Delayed (k , self .dask ) for k in self ._keys ()]
111+ return [Delayed (k , self .dask ) for k in self .__dask_keys__ ()]
116112
117113 def assign (self , ** kwargs ):
118114 for k , v in kwargs .items ():
@@ -126,7 +122,7 @@ def assign(self, **kwargs):
126122 df2 = self ._meta .assign (** _extract_meta (kwargs ))
127123 return elemwise (methods .assign , self , * pairs , meta = df2 )
128124
129- def _keys (self ):
125+ def __dask_keys__ (self ):
130126 return [(self ._name , i ) for i in range (self .npartitions )]
131127
132128 @property
@@ -572,8 +568,8 @@ def elemwise(op, *args, **kwargs):
572568 if not isinstance (arg , (_Frame , Scalar , SparseFrame ))]
573569
574570 # Get dsks graph tuple keys and adjust the key length of Scalar
575- keys = [d ._keys () * n if isinstance (d , Scalar ) or _is_broadcastable (d )
576- else d ._keys () for d in dasks ]
571+ keys = [d .__dask_keys__ () * n if isinstance (d , Scalar ) or _is_broadcastable (d )
572+ else d .__dask_keys__ () for d in dasks ]
577573
578574 if other :
579575 dsk = {(_name , i ):
0 commit comments