Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: 2
jobs:
build:
working_directory: ~/sparsity
docker:
- image: drtools/dask:latest
steps:
- checkout
- run: pip install pytest pytest-cov moto .
- run: py.test --cov sparsity --cov-report xml sparsity
- run: bash <(curl -s https://codecov.io/bash)
21 changes: 0 additions & 21 deletions circle.yml

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
packages=packages,
cmdclass=versioneer.get_cmdclass(),
install_requires=[
'pandas>=0.19.0,<0.23.0',
'pandas>=0.19.0',
'scipy>=0.18.1',
'numpy>=1.12.0',
's3fs>=0.1.0'
Expand Down
11 changes: 10 additions & 1 deletion sparsity/indexing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from pandas.core.indexing import _LocIndexer, _iLocIndexer

def get_indexers_list():

return [
('iloc', _CsrILocationIndexer),
('loc', _CsrLocIndexer),
]


class _CsrLocIndexer(_LocIndexer):

def __getitem__(self, item):
Expand All @@ -10,6 +18,7 @@ def _slice(self, slice, axis=0, kind=None):
raise NotImplementedError()
return self.obj._slice(slice)


class _CsrILocationIndexer(_iLocIndexer):

def __getitem__(self, item):
Expand All @@ -18,4 +27,4 @@ def __getitem__(self, item):
def _slice(self, slice, axis=0, kind=None):
if axis != 0:
raise NotImplementedError()
return self.obj._slice(slice)
return self.obj._slice(slice)
78 changes: 66 additions & 12 deletions sparsity/sparse_frame.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# coding=utf-8
import functools
import traceback
import uuid
import warnings
Expand All @@ -22,7 +23,8 @@
trail_db = True
except:
trail_db = False
from sparsity.indexing import _CsrILocationIndexer, _CsrLocIndexer
from sparsity.indexing import _CsrILocationIndexer, _CsrLocIndexer, \
get_indexers_list


def _is_empty(data):
Expand All @@ -45,9 +47,6 @@ class SparseFrame(object):
Simple sparse table based on scipy.sparse.csr_matrix
"""

__slots__ = ["_index", "_columns", "_data", "shape",
'ndim', 'iloc', 'loc', 'empty']

def __init__(self, data, index=None, columns=None, **kwargs):
if len(data.shape) > 2:
raise ValueError("Only two dimensional data supported")
Expand Down Expand Up @@ -86,8 +85,17 @@ def __init__(self, data, index=None, columns=None, **kwargs):

# register indexers
self.ndim = 2
self.iloc = _CsrILocationIndexer(self, 'iloc')
self.loc = _CsrLocIndexer(self, 'loc')

@classmethod
def _create_indexer(cls, name, indexer):
"""Create an indexer like _name in the class."""
if getattr(cls, name, None) is None:
_v = tuple(map(int, pd.__version__.split('.')))
if _v >= (0, 23, 0):
_indexer = functools.partial(indexer, name)
else:
_indexer = functools.partial(indexer, name=name)
setattr(cls, name, property(_indexer, doc=indexer.__doc__))

def _init_values(self, data, kwargs):
if isinstance(data, pd.DataFrame):
Expand Down Expand Up @@ -219,10 +227,21 @@ def _take(self, *args, **kwargs):
"""
return self.take(*args, **kwargs)

def _xs(self, key, *args, **kwargs):
def _xs(self, key, *args, axis=0, **kwargs):
"""Used for label based indexing."""
loc = self.index.get_loc(key)
return SparseFrame(self.data[loc], index=[key], columns=self.columns)
if axis == 0:
loc = self.index.get_loc(key)
new_data = self.data[loc]
return SparseFrame(new_data,
index=[key] * new_data.shape[0],
columns=self.columns)
else:
loc = self.columns.get_loc(key)
new_data = self.data[:, loc]
return SparseFrame(new_data,
columns=[key] * new_data.shape[1],
index=self.index)


@property
def index(self):
Expand Down Expand Up @@ -558,7 +577,7 @@ def drop(self, labels, axis=0):
labels = [labels]
if axis == 1:
mask = np.logical_not(self.columns.isin(labels))
sf = self[self.columns[mask].tolist()]
sf = self.loc[:, self.columns[mask].tolist()]
else:
raise NotImplementedError
return sf
Expand All @@ -570,9 +589,17 @@ def drop_duplicate_idx(self, **kwargs):
columns=self.columns)

def __getitem__(self, item):
if item is None:
raise ValueError('cannot label index with a null key')
if not isinstance(item, (tuple, list)):
item = [item]
return self.reindex_axis(item, axis=1)
if len(item) > 0:
return self.reindex_axis(item, axis=1)
else:
data = np.empty(shape=(self.shape[0], 0))
return SparseFrame(data, index=self.index,
columns=self.columns[[]])


def dropna(self):
"""Drop nans from index."""
Expand Down Expand Up @@ -609,7 +636,7 @@ def set_index(self, column=None, idx=None, level=None, inplace=False):
isinstance(self._index, pd.MultiIndex):
new_idx = self.index.get_level_values(level)
elif column is not None:
new_idx = np.asarray(self[column].data.todense()).reshape(-1)
new_idx = np.asarray(self.loc[:, column].data.todense()).reshape(-1)

if inplace:
self._index = _ensure_index(new_idx)
Expand Down Expand Up @@ -647,6 +674,30 @@ def _get_axis_name(self, axis):
raise ValueError('No axis named {} for {}'
.format(axis, self.__class__))

def _reindex_with_indexers(self, reindexers, **kwargs):
"""allow_dups indicates an internal call here """

# reindex doing multiple operations on different axes if indicated
new_data = self.copy()
for axis in sorted(reindexers.keys()):
index, indexer = reindexers[axis]

if index is None:
continue

if axis == 0:
new_mat = new_data.data[indexer, :]
new_data = SparseFrame(new_mat, index=index,
columns=self.columns)
elif axis == 1:
new_mat = new_data.data[:, indexer]
new_data = SparseFrame(new_mat, columns=index,
index=self.index)
else:
raise ValueError('Only supported axes are 0 and 1.')

return new_data

def reindex(self, labels=None, index=None, columns=None, axis=None,
*args, **kwargs):
"""Conform SparseFrame to new index.
Expand Down Expand Up @@ -923,3 +974,6 @@ def _check_categories_order(categories1, categories2, categorical_column_name,
mismatch_type=mismatch_type
)
)

for _name, _indexer in get_indexers_list():
SparseFrame._create_indexer(_name, _indexer)
34 changes: 14 additions & 20 deletions sparsity/test/test_dask_sparse_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import sparsity as sp
import sparsity.dask as dsp
from dask.local import get_sync
from sparsity import sparse_one_hot
from sparsity.dask.reshape import one_hot_encode
import pandas.util.testing as pdt

Expand Down Expand Up @@ -63,29 +62,23 @@ def test_loc(iindexer, correct_shape):
assert res.shape == correct_shape

def test_dask_loc(clickstream):
sf = dd.from_pandas(clickstream, npartitions=10) \
.map_partitions(
sparse_one_hot,
categories={'page_id': list('ABCDE')},
meta=list
)

sf = one_hot_encode(dd.from_pandas(clickstream, npartitions=10),
categories={'page_id': list('ABCDE'),
'other_categorical': list('FGHIJ')},
index_col=['index', 'id'])
res = sf.loc['2016-01-15':'2016-02-15']
res = sp.SparseFrame.concat(res.compute(get=get_sync).tolist())
assert res.index.date.max() == dt.date(2016, 2, 15)
assert res.index.date.min() == dt.date(2016, 1, 15)
res = res.compute()
assert res.index.levels[0].max().date() == dt.date(2016, 2, 15)
assert res.index.levels[0].min().date() == dt.date(2016, 1, 15)


def test_dask_multi_index_loc(clickstream):
sf = dd.from_pandas(clickstream, npartitions=10) \
.map_partitions(
sparse_one_hot,
index_col=['index', 'id'],
categories={'page_id': list('ABCDE')},
meta=list
)
sf = one_hot_encode(dd.from_pandas(clickstream, npartitions=10),
categories={'page_id': list('ABCDE'),
'other_categorical': list('FGHIJ')},
index_col=['index', 'id'])
res = sf.loc['2016-01-15':'2016-02-15']
res = sp.SparseFrame.vstack(res.compute(get=get_sync).tolist())
res = res.compute()
assert res.index.get_level_values(0).date.min() == dt.date(2016, 1, 15)
assert res.index.get_level_values(0).date.max() == dt.date(2016, 2, 15)

Expand Down Expand Up @@ -234,7 +227,8 @@ def test_assign_column():
dsf = dsf.assign(new=ds)
assert dsf._meta.empty
sf = dsf.compute()
assert np.all(sf.todense() == f.assign(new=s))
assert np.all((sf.todense() == f.assign(new=s)).values)


@pytest.mark.parametrize('arg_dict', [
dict(divisions=[0, 30, 50, 70, 99]),
Expand Down
29 changes: 24 additions & 5 deletions sparsity/test/test_sparse_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import datetime as dt
import os

#import dask.dataframe as dd
from contextlib import contextmanager

import numpy as np
Expand Down Expand Up @@ -202,8 +201,8 @@ def test_loc_multi_index(sf_midx, sf_midx_int):
assert np.all(sf_midx.loc[dt_slice].data.todense() ==
np.identity(5)[:3])

assert np.all(sf_midx_int.loc[1].todense() == sf_midx.data[:4,:])
assert np.all(sf_midx_int.loc[0].todense() == sf_midx.data[4, :])
assert np.all(sf_midx_int.loc[1].todense().values == sf_midx.data[:4,:])
assert np.all(sf_midx_int.loc[0].todense().values == sf_midx.data[4, :])


def test_set_index(sf_midx):
Expand Down Expand Up @@ -619,13 +618,20 @@ def test_npz_io_s3(complex_example):


def test_getitem():
sf = SparseFrame(np.identity(10), columns=list('abcdefghij'))
id_ = np.identity(10)
sf = SparseFrame(id_, columns=list('abcdefghij'))
assert sf['a'].data.todense()[0] == 1
assert sf['j'].data.todense()[9] == 1
assert np.all(sf[['a', 'b']].data.todense() == np.asmatrix(id_[:, [0, 1]]))
tmp = sf[['j', 'a']].data.todense()
assert tmp[9, 0] == 1
assert tmp[0, 1] == 1
assert (sf[list('abcdefghij')].data.todense() == np.identity(10)).all()
assert sf[[]].shape == (10, 0)
assert len(sf[[]].columns) == 0
assert isinstance(sf.columns, type(sf[[]].columns))
with pytest.raises(ValueError):
sf[None]


def test_vstack():
Expand Down Expand Up @@ -895,4 +901,17 @@ def test_empty_elemwise():
assert np.all(res == sf.data.todense())

with pytest.raises(ValueError):
res = sf.add(sf_empty, fill_value=None)
res = sf.add(sf_empty, fill_value=None)


def test_loc_duplicate_index():
sf = SparseFrame(np.identity(5),
columns=list('UUXYZ'),
index=list('AAABB'))
assert len(sf.loc['A'].index) == 3
assert len(sf.loc['B'].index) == 2
assert np.all(sf.loc['A'].todense().values == np.identity(5)[:3])
assert np.all(sf.loc['B'].todense().values == np.identity(5)[3:])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like you wanted to test columns too, but didn't do it.


assert len(sf.loc[:, 'U'].columns) == 2
assert np.all(sf.loc[:, 'U'].todense().values == np.identity(5)[:, :2])