diff --git a/sparsity/dask/core.py b/sparsity/dask/core.py index 85703eb..292b6c1 100644 --- a/sparsity/dask/core.py +++ b/sparsity/dask/core.py @@ -1,4 +1,4 @@ -from operator import getitem +from operator import getitem, itemgetter from pprint import pformat import dask @@ -8,17 +8,18 @@ from dask import threaded from dask.base import normalize_token, tokenize from dask.dataframe import methods -from dask.dataframe.core import (Scalar, Series, _emulate, _extract_meta, - _Frame, _maybe_from_pandas, apply, funcname, - no_default, partial, partial_by_order, - split_evenly, check_divisions, hash_shard, - split_out_on_index, Index, get_parallel_type) +from dask.dataframe.core import (Index, Scalar, Series, _Frame, _emulate, + _extract_meta, _maybe_from_pandas, apply, + check_divisions, funcname, get_parallel_type, + hash_shard, no_default, partial, + partial_by_order, split_evenly, + split_out_on_index) from dask.dataframe.utils import _nonempty_index, make_meta, meta_nonempty from dask.delayed import Delayed from dask.optimization import cull from dask.utils import derived_from from scipy import sparse -from toolz import merge, remove, partition_all +from toolz import merge, partition_all, remove import sparsity as sp from sparsity.dask.indexing import _LocIndexer @@ -76,6 +77,10 @@ def __init__(self, dsk, name, meta, divisions=None): self.loc = _LocIndexer(self) + def __getitem__(self, item): + return self.map_partitions(itemgetter(item), self._meta[item], + name='__getitem__') + def __dask_graph__(self): return self.dask @@ -130,6 +135,23 @@ def index(self): def map_partitions(self, func, meta, *args, **kwargs): return map_partitions(func, self, meta, *args, **kwargs) + # noinspection PyTypeChecker + def todense(self, pandas=True): + """Convert into Dask DataFrame or Series + + Returns + ------- + res: dd.DataFrame | dd.Series + """ + if not pandas: + raise NotImplementedError('Conversion to dask.array is ' + 'currently not supported!') + meta = self._meta.todense() + + dfs = [obj.todense(pandas=pandas) for obj in self.to_delayed()] + + return dd.from_delayed(dfs, meta=meta) + def to_delayed(self): return [Delayed(k, self.dask) for k in self.__dask_keys__()] @@ -654,11 +676,15 @@ def map_partitions(func, ddf, meta, name=None, **kwargs): def apply_and_enforce(func, arg, kwargs, meta): sf = func(arg, **kwargs) - columns = meta.columns if isinstance(sf, sp.SparseFrame): if len(sf.data.data) == 0: - assert meta.empty, "Received non empty meta" + assert meta.empty, \ + "Computed empty result but received non-empty meta" + assert isinstance(meta, sp.SparseFrame), \ + "Computed a SparseFrame but meta is of type {}"\ + .format(type(meta)) return meta + columns = meta.columns if (len(columns) == len(sf.columns) and type(columns) is type(sf.columns) and columns.equals(sf.columns)): diff --git a/sparsity/sparse_frame.py b/sparsity/sparse_frame.py index c3c588e..3d8b4ac 100644 --- a/sparsity/sparse_frame.py +++ b/sparsity/sparse_frame.py @@ -1,12 +1,12 @@ # coding=utf-8 -import functools import traceback import warnings from collections import OrderedDict -from functools import partial, reduce +import functools import numpy as np import pandas as pd +from functools import partial, reduce from pandas.api import types from pandas.core.common import _default_index @@ -179,15 +179,17 @@ def todense(self, pandas=True): dense = pd.DataFrame(np.empty(shape=self.shape), columns=self.columns, index=self._index[:0]) + if self.data.shape[1] == 1: # 1 empty column => empty Series + dense = dense.iloc[:, 0] elif len(dense.shape) == 1 and \ - self.data.shape[1] == 1: + self.data.shape[1] == 1: # 1 column => Series dense = pd.Series(dense, index=self.index, name=self.columns[0]) elif len(dense.shape) == 1 and \ - self.data.shape[1] > 1: + self.data.shape[1] > 1: # 1 row => DataFrame dense = pd.DataFrame(dense.reshape(1, -1), index=self.index, columns=self.columns) - else: + else: # 2+ cols and 2+ rows # need to copy, as broadcast_to returns read_only array idx = np.broadcast_to(self.index, dense.shape[0])\ .copy() @@ -1010,8 +1012,10 @@ def reindex_axis(self, labels, axis=0, method=None, new_index, idx = self.columns.reindex(labels) if idx is None: return self.copy() - # we have a hidden zero column to replace missing indices (-1) - new_data = self._data.T[idx].T[:-1] + new_data = self._data.T[idx].T + if not self.empty: + # we have a hidden zero column to replace missing indices (-1) + new_data = new_data[:-1] else: raise ValueError("Only two dimensional data supported.") diff --git a/sparsity/test/test_dask_sparse_frame.py b/sparsity/test/test_dask_sparse_frame.py index ddf1944..a20580d 100644 --- a/sparsity/test/test_dask_sparse_frame.py +++ b/sparsity/test/test_dask_sparse_frame.py @@ -1,18 +1,18 @@ -import datetime as dt import os -from distributed import Client -from uuid import uuid4 import dask import dask.dataframe as dd +import datetime as dt import numpy as np import pandas as pd +import pandas.util.testing as pdt import pytest +from distributed import Client +from uuid import uuid4 + import sparsity as sp import sparsity.dask as dsp from sparsity.dask.reshape import one_hot_encode -import pandas.util.testing as pdt - from .conftest import tmpdir dask.config.set(scheduler=dask.local.get_sync) @@ -45,6 +45,63 @@ def test_map_partitions(): assert res.shape == (10, 2) +def test_todense(): + data = pd.DataFrame(np.random.rand(10, 2)) + dsf = dsp.from_pandas(data, npartitions=3) + res = dsf.todense() + assert isinstance(res, dd.DataFrame) + computed = res.compute() + pdt.assert_frame_equal(computed, data, check_dtype=False) + + +def test_todense_series(): + data = pd.DataFrame(np.random.rand(10, 2)) + dsf = dsp.from_pandas(data, npartitions=3)[0] + res = dsf.todense() + assert isinstance(res, dd.Series) + computed = res.compute() + pdt.assert_series_equal(computed, data[0], check_dtype=False) + + +@pytest.mark.parametrize('item', [ + 'X', + ['X', 'Y'], +]) +def test_getitem(item): + df = pd.DataFrame(np.random.rand(10, 3), columns=list('XYZ'), + index=list('ABCDEFGHIJ')) + dsf = dsp.from_pandas(df, npartitions=2) + + correct_cols = item if isinstance(item, list) else [item] + res = dsf[item] + assert res.columns.tolist() == correct_cols + res_computed = res.compute() + assert res_computed.columns.tolist() == correct_cols + if not isinstance(item, list): + pdt.assert_series_equal(df[item], res_computed.todense()) + else: + pdt.assert_frame_equal(df[item], res_computed.todense()) + + +@pytest.mark.parametrize('item', [ + 'X', + ['X', 'Y'], +]) +def test_getitem_empty(item): + df = pd.DataFrame([], columns=list('XYZ'), dtype=int) + dsf = dsp.from_ddf(dd.from_pandas(df, npartitions=1)) + + correct_cols = item if isinstance(item, list) else [item] + res = dsf[item] + assert res.columns.tolist() == correct_cols + res_computed = res.compute() + assert res_computed.columns.tolist() == correct_cols + if not isinstance(item, list): + pdt.assert_series_equal(df[item], res_computed.todense()) + else: + pdt.assert_frame_equal(df[item], res_computed.todense()) + + @pytest.mark.parametrize('iindexer, correct_shape', [ (slice('A', 'B'), (2, 2)), (slice('C', None), (8, 2)), @@ -64,6 +121,7 @@ def test_loc(iindexer, correct_shape): assert isinstance(res, sp.SparseFrame) assert res.shape == correct_shape + def test_dask_loc(clickstream): sf = one_hot_encode(dd.from_pandas(clickstream, npartitions=10), categories={'page_id': list('ABCDE'), @@ -85,6 +143,7 @@ def test_dask_multi_index_loc(clickstream): assert res.index.get_level_values(0).date.min() == dt.date(2016, 1, 15) assert res.index.get_level_values(0).date.max() == dt.date(2016, 2, 15) + def test_repr(): dsf = dsp.from_pandas(pd.DataFrame(np.random.rand(10, 2)), npartitions=3)