Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions sparsity/dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ def index(self):
def map_partitions(self, func, meta, *args, **kwargs):
return map_partitions(func, self, meta, *args, **kwargs)

# noinspection PyTypeChecker
def todense(self):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather use to delayed here, then apply to_dense on each delayed object and use dd.from_delayed to construct the dense dask collection.

"""Convert into Dask DataFrame or Series

Returns
-------
res: dd.DataFrame | dd.Series
"""
meta = dd.from_pandas(self._meta.todense(), npartitions=1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta could be just self._meta.todense()

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, we need dask object here

res = self.map_partitions(
sp.SparseFrame.todense,
meta,
cls=dd.DataFrame if len(self.columns) > 1 else dd.Series,
)
return res

def to_delayed(self):
return [Delayed(k, self.dask) for k in self.__dask_keys__()]

Expand Down Expand Up @@ -644,7 +660,7 @@ def elemwise(op, *args, **kwargs):
return SparseFrame(dsk, _name, meta, divisions)


def map_partitions(func, ddf, meta, name=None, **kwargs):
def map_partitions(func, ddf, meta, name=None, cls=SparseFrame, **kwargs):
dsk = {}
name = name or func.__name__
token = tokenize(func, meta, **kwargs)
Expand All @@ -654,12 +670,11 @@ def map_partitions(func, ddf, meta, name=None, **kwargs):
value = (ddf._name, i)
dsk[(name, i)] = (apply_and_enforce, func, value, kwargs, meta)

return SparseFrame(merge(dsk, ddf.dask), name, meta, ddf.divisions)
return cls(merge(dsk, ddf.dask), name, meta, ddf.divisions)


def apply_and_enforce(func, arg, kwargs, meta):
sf = func(arg, **kwargs)
columns = meta.columns
if isinstance(sf, sp.SparseFrame):
if len(sf.data.data) == 0:
assert meta.empty, \
Expand All @@ -668,6 +683,7 @@ def apply_and_enforce(func, arg, kwargs, meta):
"Computed a SparseFrame but meta is of type {}"\
.format(type(meta))
return meta
columns = meta.columns
if (len(columns) == len(sf.columns) and
type(columns) is type(sf.columns) and
columns.equals(sf.columns)):
Expand Down
22 changes: 20 additions & 2 deletions sparsity/test/test_dask_sparse_frame.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import datetime as dt
import os
from uuid import uuid4

import dask
import dask.dataframe as dd
import datetime as dt
import numpy as np
import pandas as pd
import pandas.util.testing as pdt
import pytest
from distributed import Client
from uuid import uuid4

import sparsity as sp
import sparsity.dask as dsp
Expand Down Expand Up @@ -45,6 +45,24 @@ def test_map_partitions():
assert res.shape == (10, 2)


def test_todense():
data = pd.DataFrame(np.random.rand(10, 2))
dsf = dsp.from_pandas(data, npartitions=3)
res = dsf.todense()
assert isinstance(res, dd.DataFrame)
computed = res.compute()
pdt.assert_frame_equal(computed, data, check_dtype=False)


def test_todense_series():
data = pd.DataFrame(np.random.rand(10, 2))
dsf = dsp.from_pandas(data, npartitions=3)[0]
res = dsf.todense()
assert isinstance(res, dd.Series)
computed = res.compute()
pdt.assert_series_equal(computed, data[0], check_dtype=False)


@pytest.mark.parametrize('item', [
'X',
['X', 'Y'],
Expand Down