Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions sparsity/dask/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from operator import getitem
from operator import getitem, itemgetter
from pprint import pformat

import dask
Expand All @@ -8,17 +8,18 @@
from dask import threaded
from dask.base import normalize_token, tokenize
from dask.dataframe import methods
from dask.dataframe.core import (Scalar, Series, _emulate, _extract_meta,
_Frame, _maybe_from_pandas, apply, funcname,
no_default, partial, partial_by_order,
split_evenly, check_divisions, hash_shard,
split_out_on_index, Index, get_parallel_type)
from dask.dataframe.core import (Index, Scalar, Series, _Frame, _emulate,
_extract_meta, _maybe_from_pandas, apply,
check_divisions, funcname, get_parallel_type,
hash_shard, no_default, partial,
partial_by_order, split_evenly,
split_out_on_index)
from dask.dataframe.utils import _nonempty_index, make_meta, meta_nonempty
from dask.delayed import Delayed
from dask.optimization import cull
from dask.utils import derived_from
from scipy import sparse
from toolz import merge, remove, partition_all
from toolz import merge, partition_all, remove

import sparsity as sp
from sparsity.dask.indexing import _LocIndexer
Expand Down Expand Up @@ -76,6 +77,10 @@ def __init__(self, dsk, name, meta, divisions=None):

self.loc = _LocIndexer(self)

def __getitem__(self, item):
return self.map_partitions(itemgetter(item), self._meta[item],
name='__getitem__')

def __dask_graph__(self):
return self.dask

Expand Down Expand Up @@ -130,6 +135,22 @@ def index(self):
def map_partitions(self, func, meta, *args, **kwargs):
return map_partitions(func, self, meta, *args, **kwargs)

# noinspection PyTypeChecker
def todense(self):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather use to delayed here, then apply to_dense on each delayed object and use dd.from_delayed to construct the dense dask collection.

"""Convert into Dask DataFrame or Series

Returns
-------
res: dd.DataFrame | dd.Series
"""
meta = dd.from_pandas(self._meta.todense(), npartitions=1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta could be just self._meta.todense()

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, we need dask object here

res = self.map_partitions(
sp.SparseFrame.todense,
meta,
cls=dd.DataFrame if len(self.columns) > 1 else dd.Series,
)
return res

def to_delayed(self):
return [Delayed(k, self.dask) for k in self.__dask_keys__()]

Expand Down Expand Up @@ -639,7 +660,7 @@ def elemwise(op, *args, **kwargs):
return SparseFrame(dsk, _name, meta, divisions)


def map_partitions(func, ddf, meta, name=None, **kwargs):
def map_partitions(func, ddf, meta, name=None, cls=SparseFrame, **kwargs):
dsk = {}
name = name or func.__name__
token = tokenize(func, meta, **kwargs)
Expand All @@ -649,16 +670,20 @@ def map_partitions(func, ddf, meta, name=None, **kwargs):
value = (ddf._name, i)
dsk[(name, i)] = (apply_and_enforce, func, value, kwargs, meta)

return SparseFrame(merge(dsk, ddf.dask), name, meta, ddf.divisions)
return cls(merge(dsk, ddf.dask), name, meta, ddf.divisions)


def apply_and_enforce(func, arg, kwargs, meta):
sf = func(arg, **kwargs)
columns = meta.columns
if isinstance(sf, sp.SparseFrame):
if len(sf.data.data) == 0:
assert meta.empty, "Received non empty meta"
assert meta.empty, \
"Computed empty result but received non-empty meta"
assert isinstance(meta, sp.SparseFrame), \
"Computed a SparseFrame but meta is of type {}"\
.format(type(meta))
return meta
columns = meta.columns
if (len(columns) == len(sf.columns) and
type(columns) is type(sf.columns) and
columns.equals(sf.columns)):
Expand Down
12 changes: 7 additions & 5 deletions sparsity/sparse_frame.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# coding=utf-8
import functools
import traceback
import warnings
from collections import OrderedDict
from functools import partial, reduce

import functools
import numpy as np
import pandas as pd
from functools import partial, reduce
from pandas.api import types
from pandas.core.common import _default_index

Expand Down Expand Up @@ -179,15 +179,17 @@ def todense(self, pandas=True):
dense = pd.DataFrame(np.empty(shape=self.shape),
columns=self.columns,
index=self._index[:0])
if self.data.shape[1] == 1: # 1 empty column => empty Series
dense = dense.iloc[:, 0]
elif len(dense.shape) == 1 and \
self.data.shape[1] == 1:
self.data.shape[1] == 1: # 1 column => Series
dense = pd.Series(dense, index=self.index,
name=self.columns[0])
elif len(dense.shape) == 1 and \
self.data.shape[1] > 1:
self.data.shape[1] > 1: # 1 row => DataFrame
dense = pd.DataFrame(dense.reshape(1, -1), index=self.index,
columns=self.columns)
else:
else: # 2+ cols and 2+ rows
# need to copy, as broadcast_to returns read_only array
idx = np.broadcast_to(self.index, dense.shape[0])\
.copy()
Expand Down
69 changes: 64 additions & 5 deletions sparsity/test/test_dask_sparse_frame.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import datetime as dt
import os
from distributed import Client
from uuid import uuid4

import dask
import dask.dataframe as dd
import datetime as dt
import numpy as np
import pandas as pd
import pandas.util.testing as pdt
import pytest
from distributed import Client
from uuid import uuid4

import sparsity as sp
import sparsity.dask as dsp
from sparsity.dask.reshape import one_hot_encode
import pandas.util.testing as pdt

from .conftest import tmpdir

dask.config.set(scheduler=dask.local.get_sync)
Expand Down Expand Up @@ -45,6 +45,63 @@ def test_map_partitions():
assert res.shape == (10, 2)


def test_todense():
data = pd.DataFrame(np.random.rand(10, 2))
dsf = dsp.from_pandas(data, npartitions=3)
res = dsf.todense()
assert isinstance(res, dd.DataFrame)
computed = res.compute()
pdt.assert_frame_equal(computed, data, check_dtype=False)


def test_todense_series():
data = pd.DataFrame(np.random.rand(10, 2))
dsf = dsp.from_pandas(data, npartitions=3)[0]
res = dsf.todense()
assert isinstance(res, dd.Series)
computed = res.compute()
pdt.assert_series_equal(computed, data[0], check_dtype=False)


@pytest.mark.parametrize('item', [
'X',
['X', 'Y'],
])
def test_getitem(item):
df = pd.DataFrame(np.random.rand(10, 3), columns=list('XYZ'),
index=list('ABCDEFGHIJ'))
dsf = dsp.from_pandas(df, npartitions=2)

correct_cols = item if isinstance(item, list) else [item]
res = dsf[item]
assert res.columns.tolist() == correct_cols
res_computed = res.compute()
assert res_computed.columns.tolist() == correct_cols
if not isinstance(item, list):
pdt.assert_series_equal(df[item], res_computed.todense())
else:
pdt.assert_frame_equal(df[item], res_computed.todense())


@pytest.mark.parametrize('item', [
'X',
['X', 'Y'],
])
def test_getitem_empty(item):
df = pd.DataFrame([], columns=list('XYZ'), dtype=int)
dsf = dsp.from_ddf(dd.from_pandas(df, npartitions=1))

correct_cols = item if isinstance(item, list) else [item]
res = dsf[item]
assert res.columns.tolist() == correct_cols
res_computed = res.compute()
assert res_computed.columns.tolist() == correct_cols
if not isinstance(item, list):
pdt.assert_series_equal(df[item], res_computed.todense())
else:
pdt.assert_frame_equal(df[item], res_computed.todense())


@pytest.mark.parametrize('iindexer, correct_shape', [
(slice('A', 'B'), (2, 2)),
(slice('C', None), (8, 2)),
Expand All @@ -64,6 +121,7 @@ def test_loc(iindexer, correct_shape):
assert isinstance(res, sp.SparseFrame)
assert res.shape == correct_shape


def test_dask_loc(clickstream):
sf = one_hot_encode(dd.from_pandas(clickstream, npartitions=10),
categories={'page_id': list('ABCDE'),
Expand All @@ -85,6 +143,7 @@ def test_dask_multi_index_loc(clickstream):
assert res.index.get_level_values(0).date.min() == dt.date(2016, 1, 15)
assert res.index.get_level_values(0).date.max() == dt.date(2016, 2, 15)


def test_repr():
dsf = dsp.from_pandas(pd.DataFrame(np.random.rand(10, 2)),
npartitions=3)
Expand Down