From cc6b89e5ce5039560e59c3d6e1b248e014f67625 Mon Sep 17 00:00:00 2001 From: michcio1234 Date: Thu, 6 Jun 2019 11:51:37 +0200 Subject: [PATCH 1/7] Check for type of meta in `apply_and_enforce` It was possible that although computed type is SparseFrame, other type is returned (if meta was not a SparseFrame). Imports are not changed, just reorganized. --- sparsity/dask/core.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/sparsity/dask/core.py b/sparsity/dask/core.py index 85703eb..185a2ba 100644 --- a/sparsity/dask/core.py +++ b/sparsity/dask/core.py @@ -8,17 +8,18 @@ from dask import threaded from dask.base import normalize_token, tokenize from dask.dataframe import methods -from dask.dataframe.core import (Scalar, Series, _emulate, _extract_meta, - _Frame, _maybe_from_pandas, apply, funcname, - no_default, partial, partial_by_order, - split_evenly, check_divisions, hash_shard, - split_out_on_index, Index, get_parallel_type) +from dask.dataframe.core import (Index, Scalar, Series, _Frame, _emulate, + _extract_meta, _maybe_from_pandas, apply, + check_divisions, funcname, get_parallel_type, + hash_shard, no_default, partial, + partial_by_order, split_evenly, + split_out_on_index) from dask.dataframe.utils import _nonempty_index, make_meta, meta_nonempty from dask.delayed import Delayed from dask.optimization import cull from dask.utils import derived_from from scipy import sparse -from toolz import merge, remove, partition_all +from toolz import merge, partition_all, remove import sparsity as sp from sparsity.dask.indexing import _LocIndexer @@ -657,7 +658,11 @@ def apply_and_enforce(func, arg, kwargs, meta): columns = meta.columns if isinstance(sf, sp.SparseFrame): if len(sf.data.data) == 0: - assert meta.empty, "Received non empty meta" + assert meta.empty, \ + "Computed empty result but received non-empty meta" + assert isinstance(meta, sp.SparseFrame), \ + "Computed a SparseFrame but meta is of type {}"\ + .format(type(meta)) return meta if (len(columns) == len(sf.columns) and type(columns) is type(sf.columns) and From c50fac041d91aebba4e3308bf9209d1ff581f50e Mon Sep 17 00:00:00 2001 From: michcio1234 Date: Thu, 6 Jun 2019 14:34:20 +0200 Subject: [PATCH 2/7] Simple __getindex__ for dask SparseFrames. Support for dsp[index] syntax. Doesn't aim to work the same as in pandas, just maps __getitem__ onto partitions. --- sparsity/dask/core.py | 6 +++++- sparsity/test/test_dask_sparse_frame.py | 26 ++++++++++++++++++++++--- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/sparsity/dask/core.py b/sparsity/dask/core.py index 185a2ba..184bff1 100644 --- a/sparsity/dask/core.py +++ b/sparsity/dask/core.py @@ -1,4 +1,4 @@ -from operator import getitem +from operator import getitem, itemgetter from pprint import pformat import dask @@ -77,6 +77,10 @@ def __init__(self, dsk, name, meta, divisions=None): self.loc = _LocIndexer(self) + def __getitem__(self, item): + return self.map_partitions(itemgetter(item), self._meta[item], + name='__getitem__') + def __dask_graph__(self): return self.dask diff --git a/sparsity/test/test_dask_sparse_frame.py b/sparsity/test/test_dask_sparse_frame.py index ddf1944..7b7725e 100644 --- a/sparsity/test/test_dask_sparse_frame.py +++ b/sparsity/test/test_dask_sparse_frame.py @@ -1,18 +1,18 @@ import datetime as dt import os -from distributed import Client from uuid import uuid4 import dask import dask.dataframe as dd import numpy as np import pandas as pd +import pandas.util.testing as pdt import pytest +from distributed import Client + import sparsity as sp import sparsity.dask as dsp from sparsity.dask.reshape import one_hot_encode -import pandas.util.testing as pdt - from .conftest import tmpdir dask.config.set(scheduler=dask.local.get_sync) @@ -45,6 +45,26 @@ def test_map_partitions(): assert res.shape == (10, 2) +@pytest.mark.parametrize('item', [ + 'X', + ['X', 'Y'], +]) +def test_getitem(item): + df = pd.DataFrame(np.random.rand(10, 3), columns=list('XYZ'), + index=list('ABCDEFGHIJ')) + dsf = dsp.from_pandas(df, npartitions=2) + + correct_cols = item if isinstance(item, list) else [item] + res = dsf[item] + assert res.columns.tolist() == correct_cols + res_computed = res.compute() + assert res_computed.columns.tolist() == correct_cols + if not isinstance(item, list): + pdt.assert_series_equal(df[item], res_computed.todense()) + else: + pdt.assert_frame_equal(df[item], res_computed.todense()) + + @pytest.mark.parametrize('iindexer, correct_shape', [ (slice('A', 'B'), (2, 2)), (slice('C', None), (8, 2)), From d0a4097e0604890585cff1fd92b3c19476a58e53 Mon Sep 17 00:00:00 2001 From: michcio1234 Date: Thu, 6 Jun 2019 16:32:27 +0200 Subject: [PATCH 3/7] Add getitem test with empty frame --- sparsity/test/test_dask_sparse_frame.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/sparsity/test/test_dask_sparse_frame.py b/sparsity/test/test_dask_sparse_frame.py index 7b7725e..ec601d3 100644 --- a/sparsity/test/test_dask_sparse_frame.py +++ b/sparsity/test/test_dask_sparse_frame.py @@ -63,6 +63,25 @@ def test_getitem(item): pdt.assert_series_equal(df[item], res_computed.todense()) else: pdt.assert_frame_equal(df[item], res_computed.todense()) + + +@pytest.mark.parametrize('item', [ + 'X', + ['X', 'Y'], +]) +def test_getitem_empty(item): + df = pd.DataFrame([], columns=list('XYZ'), dtype=int) + dsf = dsp.from_ddf(dd.from_pandas(df, npartitions=1)) + + correct_cols = item if isinstance(item, list) else [item] + res = dsf[item] + assert res.columns.tolist() == correct_cols + res_computed = res.compute() + assert res_computed.columns.tolist() == correct_cols + if not isinstance(item, list): + pdt.assert_series_equal(df[item], res_computed.todense()) + else: + pdt.assert_frame_equal(df[item], res_computed.todense()) @pytest.mark.parametrize('iindexer, correct_shape', [ @@ -84,6 +103,7 @@ def test_loc(iindexer, correct_shape): assert isinstance(res, sp.SparseFrame) assert res.shape == correct_shape + def test_dask_loc(clickstream): sf = one_hot_encode(dd.from_pandas(clickstream, npartitions=10), categories={'page_id': list('ABCDE'), @@ -105,6 +125,7 @@ def test_dask_multi_index_loc(clickstream): assert res.index.get_level_values(0).date.min() == dt.date(2016, 1, 15) assert res.index.get_level_values(0).date.max() == dt.date(2016, 2, 15) + def test_repr(): dsf = dsp.from_pandas(pd.DataFrame(np.random.rand(10, 2)), npartitions=3) From f0790b56ab50a3c0efa42a3cd1ae5afda9773ac0 Mon Sep 17 00:00:00 2001 From: michcio1234 Date: Fri, 7 Jun 2019 09:03:03 +0200 Subject: [PATCH 4/7] todense() returns Series when there is one empty column Previously it returned DataFrame, even though in case of 1-column non-empty SparseFrame, it returned Series. Imports are only re-organized. --- sparsity/sparse_frame.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sparsity/sparse_frame.py b/sparsity/sparse_frame.py index c3c588e..b421dfe 100644 --- a/sparsity/sparse_frame.py +++ b/sparsity/sparse_frame.py @@ -1,12 +1,12 @@ # coding=utf-8 -import functools import traceback import warnings from collections import OrderedDict -from functools import partial, reduce +import functools import numpy as np import pandas as pd +from functools import partial, reduce from pandas.api import types from pandas.core.common import _default_index @@ -179,15 +179,17 @@ def todense(self, pandas=True): dense = pd.DataFrame(np.empty(shape=self.shape), columns=self.columns, index=self._index[:0]) + if self.data.shape[1] == 1: # 1 empty column => empty Series + dense = dense.iloc[:, 0] elif len(dense.shape) == 1 and \ - self.data.shape[1] == 1: + self.data.shape[1] == 1: # 1 column => Series dense = pd.Series(dense, index=self.index, name=self.columns[0]) elif len(dense.shape) == 1 and \ - self.data.shape[1] > 1: + self.data.shape[1] > 1: # 1 row => DataFrame dense = pd.DataFrame(dense.reshape(1, -1), index=self.index, columns=self.columns) - else: + else: # 2+ cols and 2+ rows # need to copy, as broadcast_to returns read_only array idx = np.broadcast_to(self.index, dense.shape[0])\ .copy() From ca4a60a24532d4e74d9d035f1397fb84c7eaa13b Mon Sep 17 00:00:00 2001 From: michcio1234 Date: Fri, 7 Jun 2019 12:40:41 +0200 Subject: [PATCH 5/7] Add .todense() method to Dask SparseFrame It works by mapping SparseFrame.todense onto partitions. It as necessary to allow `map_partitions` to return other types then SparseFrame, so kwarg `cls` was added. It implies that one cannot use `cls` kwarg as an argument to mapped function (because it will be consumed by `map_partitions` and not passed to a mapped function). --- sparsity/dask/core.py | 22 +++++++++++++++++++--- sparsity/test/test_dask_sparse_frame.py | 22 ++++++++++++++++++++-- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/sparsity/dask/core.py b/sparsity/dask/core.py index 184bff1..2096549 100644 --- a/sparsity/dask/core.py +++ b/sparsity/dask/core.py @@ -135,6 +135,22 @@ def index(self): def map_partitions(self, func, meta, *args, **kwargs): return map_partitions(func, self, meta, *args, **kwargs) + # noinspection PyTypeChecker + def todense(self): + """Convert into Dask DataFrame or Series + + Returns + ------- + res: dd.DataFrame | dd.Series + """ + meta = dd.from_pandas(self._meta.todense(), npartitions=1) + res = self.map_partitions( + sp.SparseFrame.todense, + meta, + cls=dd.DataFrame if len(self.columns) > 1 else dd.Series, + ) + return res + def to_delayed(self): return [Delayed(k, self.dask) for k in self.__dask_keys__()] @@ -644,7 +660,7 @@ def elemwise(op, *args, **kwargs): return SparseFrame(dsk, _name, meta, divisions) -def map_partitions(func, ddf, meta, name=None, **kwargs): +def map_partitions(func, ddf, meta, name=None, cls=SparseFrame, **kwargs): dsk = {} name = name or func.__name__ token = tokenize(func, meta, **kwargs) @@ -654,12 +670,11 @@ def map_partitions(func, ddf, meta, name=None, **kwargs): value = (ddf._name, i) dsk[(name, i)] = (apply_and_enforce, func, value, kwargs, meta) - return SparseFrame(merge(dsk, ddf.dask), name, meta, ddf.divisions) + return cls(merge(dsk, ddf.dask), name, meta, ddf.divisions) def apply_and_enforce(func, arg, kwargs, meta): sf = func(arg, **kwargs) - columns = meta.columns if isinstance(sf, sp.SparseFrame): if len(sf.data.data) == 0: assert meta.empty, \ @@ -668,6 +683,7 @@ def apply_and_enforce(func, arg, kwargs, meta): "Computed a SparseFrame but meta is of type {}"\ .format(type(meta)) return meta + columns = meta.columns if (len(columns) == len(sf.columns) and type(columns) is type(sf.columns) and columns.equals(sf.columns)): diff --git a/sparsity/test/test_dask_sparse_frame.py b/sparsity/test/test_dask_sparse_frame.py index ec601d3..a20580d 100644 --- a/sparsity/test/test_dask_sparse_frame.py +++ b/sparsity/test/test_dask_sparse_frame.py @@ -1,14 +1,14 @@ -import datetime as dt import os -from uuid import uuid4 import dask import dask.dataframe as dd +import datetime as dt import numpy as np import pandas as pd import pandas.util.testing as pdt import pytest from distributed import Client +from uuid import uuid4 import sparsity as sp import sparsity.dask as dsp @@ -45,6 +45,24 @@ def test_map_partitions(): assert res.shape == (10, 2) +def test_todense(): + data = pd.DataFrame(np.random.rand(10, 2)) + dsf = dsp.from_pandas(data, npartitions=3) + res = dsf.todense() + assert isinstance(res, dd.DataFrame) + computed = res.compute() + pdt.assert_frame_equal(computed, data, check_dtype=False) + + +def test_todense_series(): + data = pd.DataFrame(np.random.rand(10, 2)) + dsf = dsp.from_pandas(data, npartitions=3)[0] + res = dsf.todense() + assert isinstance(res, dd.Series) + computed = res.compute() + pdt.assert_series_equal(computed, data[0], check_dtype=False) + + @pytest.mark.parametrize('item', [ 'X', ['X', 'Y'], From b07d147e9238cdba231a0024942894fd2ca74f88 Mon Sep 17 00:00:00 2001 From: michcio1234 Date: Fri, 7 Jun 2019 15:03:13 +0200 Subject: [PATCH 6/7] Support reindex in case of empty frame --- sparsity/sparse_frame.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sparsity/sparse_frame.py b/sparsity/sparse_frame.py index b421dfe..3d8b4ac 100644 --- a/sparsity/sparse_frame.py +++ b/sparsity/sparse_frame.py @@ -1012,8 +1012,10 @@ def reindex_axis(self, labels, axis=0, method=None, new_index, idx = self.columns.reindex(labels) if idx is None: return self.copy() - # we have a hidden zero column to replace missing indices (-1) - new_data = self._data.T[idx].T[:-1] + new_data = self._data.T[idx].T + if not self.empty: + # we have a hidden zero column to replace missing indices (-1) + new_data = new_data[:-1] else: raise ValueError("Only two dimensional data supported.") From 4c63a0c04324bf12cd8e2bdb323bf48602fc146d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alan=20H=C3=B6ng?= Date: Tue, 11 Jun 2019 14:08:34 +0200 Subject: [PATCH 7/7] More elegant way to implement todense function. (#80) This leverages the dask.delayed object api to achieve the same result which was previously a hack between map_partitions and initializing dd.DataFrame directy. --- sparsity/dask/core.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/sparsity/dask/core.py b/sparsity/dask/core.py index 2096549..292b6c1 100644 --- a/sparsity/dask/core.py +++ b/sparsity/dask/core.py @@ -136,20 +136,21 @@ def map_partitions(self, func, meta, *args, **kwargs): return map_partitions(func, self, meta, *args, **kwargs) # noinspection PyTypeChecker - def todense(self): + def todense(self, pandas=True): """Convert into Dask DataFrame or Series Returns ------- res: dd.DataFrame | dd.Series """ - meta = dd.from_pandas(self._meta.todense(), npartitions=1) - res = self.map_partitions( - sp.SparseFrame.todense, - meta, - cls=dd.DataFrame if len(self.columns) > 1 else dd.Series, - ) - return res + if not pandas: + raise NotImplementedError('Conversion to dask.array is ' + 'currently not supported!') + meta = self._meta.todense() + + dfs = [obj.todense(pandas=pandas) for obj in self.to_delayed()] + + return dd.from_delayed(dfs, meta=meta) def to_delayed(self): return [Delayed(k, self.dask) for k in self.__dask_keys__()] @@ -660,7 +661,7 @@ def elemwise(op, *args, **kwargs): return SparseFrame(dsk, _name, meta, divisions) -def map_partitions(func, ddf, meta, name=None, cls=SparseFrame, **kwargs): +def map_partitions(func, ddf, meta, name=None, **kwargs): dsk = {} name = name or func.__name__ token = tokenize(func, meta, **kwargs) @@ -670,7 +671,7 @@ def map_partitions(func, ddf, meta, name=None, cls=SparseFrame, **kwargs): value = (ddf._name, i) dsk[(name, i)] = (apply_and_enforce, func, value, kwargs, meta) - return cls(merge(dsk, ddf.dask), name, meta, ddf.divisions) + return SparseFrame(merge(dsk, ddf.dask), name, meta, ddf.divisions) def apply_and_enforce(func, arg, kwargs, meta):