Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions sparsity/dask/core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from operator import getitem
from operator import getitem, itemgetter
from pprint import pformat

import dask
Expand All @@ -8,17 +8,18 @@
from dask import threaded
from dask.base import normalize_token, tokenize
from dask.dataframe import methods
from dask.dataframe.core import (Scalar, Series, _emulate, _extract_meta,
_Frame, _maybe_from_pandas, apply, funcname,
no_default, partial, partial_by_order,
split_evenly, check_divisions, hash_shard,
split_out_on_index, Index, get_parallel_type)
from dask.dataframe.core import (Index, Scalar, Series, _Frame, _emulate,
_extract_meta, _maybe_from_pandas, apply,
check_divisions, funcname, get_parallel_type,
hash_shard, no_default, partial,
partial_by_order, split_evenly,
split_out_on_index)
from dask.dataframe.utils import _nonempty_index, make_meta, meta_nonempty
from dask.delayed import Delayed
from dask.optimization import cull
from dask.utils import derived_from
from scipy import sparse
from toolz import merge, remove, partition_all
from toolz import merge, partition_all, remove

import sparsity as sp
from sparsity.dask.indexing import _LocIndexer
Expand Down Expand Up @@ -76,6 +77,10 @@ def __init__(self, dsk, name, meta, divisions=None):

self.loc = _LocIndexer(self)

def __getitem__(self, item):
return self.map_partitions(itemgetter(item), self._meta[item],
name='__getitem__')

def __dask_graph__(self):
return self.dask

Expand Down Expand Up @@ -130,6 +135,23 @@ def index(self):
def map_partitions(self, func, meta, *args, **kwargs):
return map_partitions(func, self, meta, *args, **kwargs)

# noinspection PyTypeChecker
def todense(self, pandas=True):
"""Convert into Dask DataFrame or Series

Returns
-------
res: dd.DataFrame | dd.Series
"""
if not pandas:
raise NotImplementedError('Conversion to dask.array is '
'currently not supported!')
meta = self._meta.todense()

dfs = [obj.todense(pandas=pandas) for obj in self.to_delayed()]

return dd.from_delayed(dfs, meta=meta)

def to_delayed(self):
return [Delayed(k, self.dask) for k in self.__dask_keys__()]

Expand Down Expand Up @@ -654,11 +676,15 @@ def map_partitions(func, ddf, meta, name=None, **kwargs):

def apply_and_enforce(func, arg, kwargs, meta):
sf = func(arg, **kwargs)
columns = meta.columns
if isinstance(sf, sp.SparseFrame):
if len(sf.data.data) == 0:
assert meta.empty, "Received non empty meta"
assert meta.empty, \
"Computed empty result but received non-empty meta"
assert isinstance(meta, sp.SparseFrame), \
"Computed a SparseFrame but meta is of type {}"\
.format(type(meta))
return meta
columns = meta.columns
if (len(columns) == len(sf.columns) and
type(columns) is type(sf.columns) and
columns.equals(sf.columns)):
Expand Down
18 changes: 11 additions & 7 deletions sparsity/sparse_frame.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# coding=utf-8
import functools
import traceback
import warnings
from collections import OrderedDict
from functools import partial, reduce

import functools
import numpy as np
import pandas as pd
from functools import partial, reduce
from pandas.api import types
from pandas.core.common import _default_index

Expand Down Expand Up @@ -179,15 +179,17 @@ def todense(self, pandas=True):
dense = pd.DataFrame(np.empty(shape=self.shape),
columns=self.columns,
index=self._index[:0])
if self.data.shape[1] == 1: # 1 empty column => empty Series
dense = dense.iloc[:, 0]
elif len(dense.shape) == 1 and \
self.data.shape[1] == 1:
self.data.shape[1] == 1: # 1 column => Series
dense = pd.Series(dense, index=self.index,
name=self.columns[0])
elif len(dense.shape) == 1 and \
self.data.shape[1] > 1:
self.data.shape[1] > 1: # 1 row => DataFrame
dense = pd.DataFrame(dense.reshape(1, -1), index=self.index,
columns=self.columns)
else:
else: # 2+ cols and 2+ rows
# need to copy, as broadcast_to returns read_only array
idx = np.broadcast_to(self.index, dense.shape[0])\
.copy()
Expand Down Expand Up @@ -1010,8 +1012,10 @@ def reindex_axis(self, labels, axis=0, method=None,
new_index, idx = self.columns.reindex(labels)
if idx is None:
return self.copy()
# we have a hidden zero column to replace missing indices (-1)
new_data = self._data.T[idx].T[:-1]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work on empty frames if scipy>=1.0.0

new_data = self._data.T[idx].T
if not self.empty:
# we have a hidden zero column to replace missing indices (-1)
new_data = new_data[:-1]
else:
raise ValueError("Only two dimensional data supported.")

Expand Down
69 changes: 64 additions & 5 deletions sparsity/test/test_dask_sparse_frame.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import datetime as dt
import os
from distributed import Client
from uuid import uuid4

import dask
import dask.dataframe as dd
import datetime as dt
import numpy as np
import pandas as pd
import pandas.util.testing as pdt
import pytest
from distributed import Client
from uuid import uuid4

import sparsity as sp
import sparsity.dask as dsp
from sparsity.dask.reshape import one_hot_encode
import pandas.util.testing as pdt

from .conftest import tmpdir

dask.config.set(scheduler=dask.local.get_sync)
Expand Down Expand Up @@ -45,6 +45,63 @@ def test_map_partitions():
assert res.shape == (10, 2)


def test_todense():
data = pd.DataFrame(np.random.rand(10, 2))
dsf = dsp.from_pandas(data, npartitions=3)
res = dsf.todense()
assert isinstance(res, dd.DataFrame)
computed = res.compute()
pdt.assert_frame_equal(computed, data, check_dtype=False)


def test_todense_series():
data = pd.DataFrame(np.random.rand(10, 2))
dsf = dsp.from_pandas(data, npartitions=3)[0]
res = dsf.todense()
assert isinstance(res, dd.Series)
computed = res.compute()
pdt.assert_series_equal(computed, data[0], check_dtype=False)


@pytest.mark.parametrize('item', [
'X',
['X', 'Y'],
])
def test_getitem(item):
df = pd.DataFrame(np.random.rand(10, 3), columns=list('XYZ'),
index=list('ABCDEFGHIJ'))
dsf = dsp.from_pandas(df, npartitions=2)

correct_cols = item if isinstance(item, list) else [item]
res = dsf[item]
assert res.columns.tolist() == correct_cols
res_computed = res.compute()
assert res_computed.columns.tolist() == correct_cols
if not isinstance(item, list):
pdt.assert_series_equal(df[item], res_computed.todense())
else:
pdt.assert_frame_equal(df[item], res_computed.todense())


@pytest.mark.parametrize('item', [
'X',
['X', 'Y'],
])
def test_getitem_empty(item):
df = pd.DataFrame([], columns=list('XYZ'), dtype=int)
dsf = dsp.from_ddf(dd.from_pandas(df, npartitions=1))

correct_cols = item if isinstance(item, list) else [item]
res = dsf[item]
assert res.columns.tolist() == correct_cols
res_computed = res.compute()
assert res_computed.columns.tolist() == correct_cols
if not isinstance(item, list):
pdt.assert_series_equal(df[item], res_computed.todense())
else:
pdt.assert_frame_equal(df[item], res_computed.todense())


@pytest.mark.parametrize('iindexer, correct_shape', [
(slice('A', 'B'), (2, 2)),
(slice('C', None), (8, 2)),
Expand All @@ -64,6 +121,7 @@ def test_loc(iindexer, correct_shape):
assert isinstance(res, sp.SparseFrame)
assert res.shape == correct_shape


def test_dask_loc(clickstream):
sf = one_hot_encode(dd.from_pandas(clickstream, npartitions=10),
categories={'page_id': list('ABCDE'),
Expand All @@ -85,6 +143,7 @@ def test_dask_multi_index_loc(clickstream):
assert res.index.get_level_values(0).date.min() == dt.date(2016, 1, 15)
assert res.index.get_level_values(0).date.max() == dt.date(2016, 2, 15)


def test_repr():
dsf = dsp.from_pandas(pd.DataFrame(np.random.rand(10, 2)),
npartitions=3)
Expand Down