Skip to content

Sort index#37

Merged
kayibal merged 15 commits intomasterfrom
sort_index
Apr 19, 2018
Merged

Sort index#37
kayibal merged 15 commits intomasterfrom
sort_index

Conversation

@kayibal
Copy link
Copy Markdown

@kayibal kayibal commented Apr 1, 2018

This implements sort_index and will sort the distributed dataframe and set divisions.

@kayibal kayibal requested review from michcio1234 and vitords April 1, 2018 23:17
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 1, 2018

Codecov Report

Merging #37 into master will increase coverage by 2.57%.
The diff coverage is 94.4%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #37      +/-   ##
==========================================
+ Coverage   83.07%   85.64%   +2.57%     
==========================================
  Files           6        7       +1     
  Lines         957     1094     +137     
==========================================
+ Hits          795      937     +142     
+ Misses        162      157       -5
Impacted Files Coverage Δ
sparsity/sparse_frame.py 87.87% <100%> (+0.47%) ⬆️
sparsity/dask/core.py 82.56% <90.47%> (+3.58%) ⬆️
sparsity/dask/shuffle.py 95% <95%> (ø)
sparsity/dask/io.py 65.51% <0%> (-1.73%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 87f9928...0675bfa. Read the comment docs.

Copy link
Copy Markdown

@michcio1234 michcio1234 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, but I have small remarks.


def sort_index(df, npartitions=None, shuffle='tasks',
drop=True, upsample=1.0, divisions=None,
partition_size=128e6, **kwargs):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is copied from dask almost 1:1, so I'll assume it's good.


def rearrange_by_index(df, npartitions=None, max_branch=None,
shuffle='task'):
if shuffle == 'tasks':
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default is task but only supported method is tasks.


for stage in range(1, stages + 1):
group = dict((('shuffle-group-' + token, stage, inp),
(shuffle_index, ('shuffle-join-' + token, stage - 1, inp),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of few differences from dask version. You changed shuffle_group function to shuffle_index. Maybe key should also be changed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'll leave this for now as I think the main thing what this function does is grouping the partitions into new groups.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

res2.sort_index(inplace=True)

pdt.assert_frame_equal(res, correct)
pdt.assert_frame_equal(res1, correct)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing assert for res2?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I think you're right :) it was not from this PR but I actually had missed it.

dsf = dsf.sort_index(npartitions='auto', partition_size=80000)

assert dsf.known_divisions
assert dsf.npartitions == 16
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why this should be desired result. partition_size argument is not documented (I couldn't find documentation even in dask repo) - could you please describe this test a bit and maybe add at least partition_size description to sort_index's docstring?

parts = [dsf.get_partition(i).compute().todense()
for i in range(dsf.npartitions)]
res = pd.concat(parts, axis=0)
pdt.assert_frame_equal(res, correct) No newline at end of file
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're already making changes... Could you please add a newline here? :P

@kayibal kayibal merged commit 4d85502 into master Apr 19, 2018
@kayibal kayibal deleted the sort_index branch April 19, 2018 15:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants