Skip to content
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
58da78f
Implement distributed groupby sum and apply_concat_apply function for…
kayibal Mar 7, 2018
51ae0a2
Merge branch 'master' into distributed-groupby-sum
kayibal Mar 7, 2018
3f3be49
update comments
kayibal Mar 9, 2018
63d4dc8
add test for different index datatypes
kayibal Mar 9, 2018
df110c9
implement sort_index
kayibal Mar 31, 2018
49cb182
implement __len__
kayibal Apr 1, 2018
fd10c00
implement rename, optimize groupby_sum and join
kayibal Apr 1, 2018
b89f7d4
implement distributed set_index
kayibal Apr 1, 2018
bb818ff
Merge branch 'optimization/distributed' into develop
kayibal Apr 1, 2018
f942a91
Merge branch 'master' into refactor/binning
kayibal Oct 8, 2018
d2b68e3
Formatting
kayibal Oct 8, 2018
2d2ab88
fix dask version until we have an update
kayibal Oct 8, 2018
8fc32b8
number of line ouput in __repr__ changed.
kayibal Oct 8, 2018
47782ed
stop checking for number of lines as it adjust to terminal size now...
kayibal Oct 8, 2018
9c352f9
Create folders when writing to local filesystem
kayibal Oct 9, 2018
cd2ce7b
Merge branch 'master' into refactor/binning
kayibal Oct 17, 2018
fcdd167
Fix empty dtype
kayibal Oct 10, 2018
18469ff
Implement distributed drop.
kayibal Oct 10, 2018
4876190
Always add npz extension when writing SparseFrame to npz format
kayibal Oct 10, 2018
4c18873
Fix metadata handling on set_index method
kayibal Oct 18, 2018
80aef8e
Correct dask requirement
kayibal Oct 22, 2018
414c69a
Add method for dask SparseFrame and tuple divisions type
kayibal Oct 29, 2018
f0aa2d8
Support empty divisions
kayibal Oct 30, 2018
3329492
Pass on divisions on sort_index
kayibal Oct 31, 2018
3223850
More restrictive pandas version as .drop method fails with pandas==0.…
kayibal Oct 31, 2018
a603d3d
Fix bug where empty dataframe would create wrongly sized shuffle array
kayibal Nov 1, 2018
92a8f5d
Fix bug where join with in memory sparse frame would return rows from…
kayibal Nov 15, 2018
c6fa6d0
update dask version
kayibal Dec 4, 2018
d60b758
Update dask version in setup.py
kayibal Dec 4, 2018
031d965
Update deprecated set_options call
kayibal Dec 4, 2018
734ff26
some fixes to tests
kayibal Dec 4, 2018
fbf0b17
Fix moto and boto versions
kayibal Dec 4, 2018
b836024
Update test dependencies
kayibal Dec 4, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ jobs:
- image: drtools/dask:latest
steps:
- checkout
- run: pip install pytest pytest-cov moto .
- run: pip install boto3==1.7.84 botocore==1.10.84 moto==1.3.6
- run: pip install pytest pytest-cov dask==1.0.0 .
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be consistent with setup.py? Or shouldn't it be done by pip install .[test]?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx it is resolved

- run: py.test --cov sparsity --cov-report xml sparsity
- run: bash <(curl -s https://codecov.io/bash)
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ build/
traildb_sparse.c
__pycache__
*.egg-info
*.npz
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
packages=packages,
cmdclass=versioneer.get_cmdclass(),
install_requires=[
'pandas>=0.19.0,<=0.23.4',
'pandas>=0.21.0,<=0.23.4',
'scipy>=0.18.1',
'numpy>=1.12.0',
's3fs>=0.1.0',
'dask>0.20.0'
],
test_requires=[
'moto',
Expand Down
37 changes: 32 additions & 5 deletions sparsity/dask/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def make_meta_sparsity(inp):
raise NotImplementedError("Can't make meta for type: {}"
.format(str(type(inp))))


@meta_nonempty.register(sp.SparseFrame)
def meta_nonempty_sparsity(x):
idx = _nonempty_index(x.index)
Expand All @@ -53,6 +52,8 @@ def optimize(dsk, keys, **kwargs):


def finalize(results):
if all(map(lambda x: x.empty, results)):
return results[0]
results = [r for r in results if not r.empty]
return sp.SparseFrame.vstack(results)

Expand Down Expand Up @@ -137,6 +138,10 @@ def assign(self, **kwargs):
df2 = self._meta.assign(**_extract_meta(kwargs))
return elemwise(methods.assign, self, *pairs, meta=df2)

@derived_from(sp.SparseFrame)
def add(self, other, how='outer', fill_value=0,):
return elemwise(sp.SparseFrame.add, self, other, meta=self._meta)

def __dask_keys__(self):
return [(self._name, i) for i in range(self.npartitions)]

Expand Down Expand Up @@ -186,10 +191,12 @@ def join(self, other, on=None, how='left', lsuffix='',
rsuffix='', npartitions=None):
from .multi import join_indexed_sparseframes

if isinstance(other, sp.SparseFrame) and how in ['left', 'inner']:
if isinstance(other, sp.SparseFrame):
meta = sp.SparseFrame.join(self._meta_nonempty,
other,
how=how)
# make empty meta
meta = meta.loc[[False] * meta.shape[0], :]
join_func = partial(sp.SparseFrame.join, other=other,
how=how)
return self.map_partitions(join_func, meta=meta, name='simplejoin')
Expand Down Expand Up @@ -260,7 +267,7 @@ def sort_index(self, npartitions=None, divisions=None, **kwargs):
"""
from .shuffle import sort_index
return sort_index(self, npartitions=npartitions,
divisions=None, **kwargs)
divisions=divisions, **kwargs)

@derived_from(sp.SparseFrame)
def set_index(self, column=None, idx=None, level=None):
Expand All @@ -269,12 +276,31 @@ def set_index(self, column=None, idx=None, level=None):
if idx is not None:
raise NotImplementedError('Only column or level supported')
new_name = self._meta.index.names[level] if level else column
meta = self._meta.set_index(pd.Index([], name=new_name))

if level is not None:
new_idx = self._meta.index.get_level_values(level)
else:
new_idx = pd.Index(np.empty((0,0), dtype=self._meta.values.dtype))
new_idx.name = new_name

meta = self._meta.set_index(idx=new_idx)
res = self.map_partitions(sp.SparseFrame.set_index, meta=meta,
column=column, idx=idx, level=level)
res.divisions = [None] * ( self.npartitions + 1)
res.divisions = tuple([None] * ( self.npartitions + 1))
return res

def rename(self, columns):
_meta = self._meta.rename(columns=columns)
return self.map_partitions(sp.SparseFrame.rename, meta=_meta,
columns=columns)

def drop(self, labels, axis=1):
if axis != 1:
raise NotImplementedError('Axis != 1 is currently not supported.')
_meta = self._meta.drop(labels=labels)
return self.map_partitions(sp.SparseFrame.drop, meta=_meta,
labels=labels)

def __repr__(self):
return \
"""
Expand Down Expand Up @@ -623,6 +649,7 @@ def apply_and_enforce(func, arg, kwargs, meta):
columns = meta.columns
if isinstance(sf, sp.SparseFrame):
if len(sf.data.data) == 0:
assert meta.empty, "Received non empty meta"
return meta
if (len(columns) == len(sf.columns) and
type(columns) is type(sf.columns) and
Expand Down
3 changes: 3 additions & 0 deletions sparsity/dask/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ def to_npz(sf: SparseFrame, path: str, block_size=None,
if '*' not in path:
raise ValueError('Path needs to contain "*" wildcard.')

if '.npz' not in path:
path += '.npz'

tmpl_func = path.replace('*', '{0:06d}').format
metadata_fn = path.split('*')[0] + 'metadata.npz'
paths = list(map(tmpl_func, range(sf.npartitions)))
Expand Down
2 changes: 1 addition & 1 deletion sparsity/dask/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,4 @@ def _maybe_align_partitions(args):
dfs2 = iter(align_partitions(*dfs)[0])
return [a if not isinstance(a, (_Frame, SparseFrame))
else next(dfs2) for a in args]
return args
return args
2 changes: 1 addition & 1 deletion sparsity/dask/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def rearrange_by_index_tasks(df, max_branch=32, npartitions=None):

def shuffle_index(sf: sp.SparseFrame, stage, k, npartitions):
ind = sf['_partitions'].todense().astype(np.int)
c = ind._values
c = ind._values.reshape(-1)
typ = np.min_scalar_type(npartitions * 2)
c = c.astype(typ)

Expand Down
3 changes: 2 additions & 1 deletion sparsity/io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from io import BytesIO
from pathlib import PurePath
from pathlib import PurePath, Path
from urllib.parse import urlparse

import numpy as np
Expand Down Expand Up @@ -67,6 +67,7 @@ def _write_dict_npz(data, filename, block_size, storage_options):
filename = path2str(filename)
protocol = urlparse(filename).scheme or 'file'
if protocol == 'file':
Path(filename).parent.mkdir(parents=True, exist_ok=True)
with open(filename, 'wb') as fp:
np.savez(fp, **data)
else:
Expand Down
3 changes: 2 additions & 1 deletion sparsity/sparse_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ def todense(self, pandas=True):

if pandas:
if self.empty:
dense = pd.DataFrame([], columns=self.columns,
dense = pd.DataFrame(np.empty(shape=self.shape),
columns=self.columns,
index=self._index[:0])
elif len(dense.shape) == 1 and \
self.data.shape[1] == 1:
Expand Down
32 changes: 30 additions & 2 deletions sparsity/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ def gendata(n, categorical=False):
@pytest.fixture()
def sample_frame_labels():
return SparseFrame(np.identity(5),
columns = list('ABCDE'),
index = list('VWXYZ'))
columns=list('ABCDE'),
index=list('VWXYZ'))

@pytest.fixture()
def weekdays():
Expand Down Expand Up @@ -96,6 +96,34 @@ def clickstream():
return df


@pytest.fixture()
def complex_example():
first = np.identity(10)
second = np.zeros((4, 10))
third = np.zeros((4, 10))
second[[0, 1, 2, 3], [2, 3, 4, 5]] = 10
third[[0, 1, 2, 3], [6, 7, 8, 9]] = 20

shuffle_idx = np.arange(10)
np.random.shuffle(shuffle_idx)

first = SparseFrame(first[shuffle_idx],
index=np.arange(10)[shuffle_idx])

shuffle_idx = np.arange(4)
np.random.shuffle(shuffle_idx)

second = SparseFrame(second[shuffle_idx],
index=np.arange(2, 6)[shuffle_idx])

shuffle_idx = np.arange(4)
np.random.shuffle(shuffle_idx)

third = SparseFrame(third[shuffle_idx],
index=np.arange(6, 10)[shuffle_idx])
return first, second, third


@contextmanager
def tmpdir(dir=None):
dirname = tempfile.mkdtemp(dir=dir)
Expand Down
20 changes: 18 additions & 2 deletions sparsity/test/test_dask_sparse_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@
import pytest
import sparsity as sp
import sparsity.dask as dsp
from dask.local import get_sync
from sparsity.dask.reshape import one_hot_encode
import pandas.util.testing as pdt

from .conftest import tmpdir

dask.context.set_options(get=dask.local.get_sync)
dask.config.set(scheduler=dask.local.get_sync)


@pytest.fixture
Expand Down Expand Up @@ -289,6 +288,23 @@ def test_distributed_join(how):

pdt.assert_frame_equal(correct, res)


def test_add():
df = pd.DataFrame(np.identity(12))
df2 = df.copy()
df2.index += 1

sf1 = sp.SparseFrame(df)
sf2 = sp.SparseFrame(df2)
correct = sf1.add(sf2).todense()

dsf = dsp.from_pandas(df, npartitions=4)
dsf2 = dsp.from_pandas(df2, npartitions=4)

res = dsf.add(dsf2).compute().todense()
pdt.assert_frame_equal(res, correct)


@pytest.mark.parametrize('idx', [
np.random.choice([uuid4() for i in range(1000)], size=10000),
np.random.randint(0, 10000, 10000),
Expand Down
43 changes: 12 additions & 31 deletions sparsity/test/test_sparse_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,34 +295,6 @@ def test_existing_column_assign_number():
assert np.all(correct == sf.data.todense())


@pytest.fixture()
def complex_example():
first = np.identity(10)
second = np.zeros((4, 10))
third = np.zeros((4, 10))
second[[0, 1, 2, 3], [2, 3, 4, 5]] = 10
third[[0, 1, 2, 3], [6, 7, 8, 9]] = 20

shuffle_idx = np.arange(10)
np.random.shuffle(shuffle_idx)

first = SparseFrame(first[shuffle_idx],
index=np.arange(10)[shuffle_idx])

shuffle_idx = np.arange(4)
np.random.shuffle(shuffle_idx)

second = SparseFrame(second[shuffle_idx],
index=np.arange(2, 6)[shuffle_idx])

shuffle_idx = np.arange(4)
np.random.shuffle(shuffle_idx)

third = SparseFrame(third[shuffle_idx],
index=np.arange(6, 10)[shuffle_idx])
return first, second, third


def test_add_total_overlap(complex_example):
first, second, third = complex_example
correct = first.sort_index().data.todense()
Expand Down Expand Up @@ -705,14 +677,17 @@ def test_repr():
assert isinstance(res, str)
assert '10x10000' in res
assert '0 stored' in res
assert len(res.splitlines()) == 1 + 5 + 2
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did the lines number change? I can't see any change that you made that should affect it...

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's related to pandas version...

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I hope our __repr__ still makes sense then...

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should just lines change I think depending on your terminal size? Don't know...


sf = SparseFrame(sparse.csr_matrix((10000, 10000)))
res = sf.__repr__()
assert isinstance(res, str)
assert len(res.splitlines()) == 1 + 5 + 2

sf = SparseFrame(np.array([]), index=[], columns=['A', 'B'])
sf = SparseFrame(np.empty(shape=(0, 2)), index=[], columns=['A', 'B'])
res = sf.__repr__()
assert isinstance(res, str)

sf = SparseFrame(np.empty(shape=(0, 200)), index=[],
columns=np.arange(200))
res = sf.__repr__()
assert isinstance(res, str)

Expand Down Expand Up @@ -861,6 +836,12 @@ def test_drop_single_label():
np.testing.assert_array_equal(sf.data.todense(), correct)


def test_drop_non_existing_label():
old_names = list('ABCDE')
sf = SparseFrame(np.identity(5), columns=old_names)
sf = sf.drop('Z', axis=1)


def test_drop_multiple_labels():
old_names = list('ABCDE')
sf = SparseFrame(np.identity(5), columns=old_names)
Expand Down