Skip to content

Conversation

@sarahyurick
Copy link
Collaborator

Two files have been modified here:

  1. In aggregate.py, I changed _perform_aggregation() to construct a dict-of-lists, rather than a dict-of-dicts. Although Dask still supports the ability to pass a dict-of-dicts into groupby().agg(), Pandas previously deprecated and has now removed this functionality (see here: https://pandas.pydata.org/pandas-docs/stable/whatsnew/v0.20.0.html#deprecate-groupby-agg-with-a-dictionary-when-renaming).
  2. In join.py, there were a couple function calls that were Pandas-specific, so I changed them to call to dask.dataframe instead.

Let me know what you think!

lhs_partition = lhs_partition.assign(common=1)
rhs_partition = rhs_partition.assign(common=1)
merged_data = pd.merge(lhs_partition, rhs_partition, on=["common"])
merged_data = dd.multi.merge(lhs_partition, rhs_partition, on=["common"])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense! I had a look into the dask-dataframe functions and they have a fallback to the pandas functionality (if it is actually a pandas object) - so there is no difference for dask-pandas (but for other dask-like libraries, there might be). Cool!

Comment on lines +365 to +376
aggregations_dict = defaultdict(list)
input_output_cols = []
for aggregation in aggregations:
input_col, output_col, aggregation_f = aggregation

aggregations_dict[input_col][output_col] = aggregation_f
aggregations_dict[input_col].append(aggregation_f)
input_output_cols.append((input_col, output_col))

# Now apply the aggregation
logger.debug(f"Performing aggregation {dict(aggregations_dict)}")
agg_result = grouped_df.agg(aggregations_dict)
agg_result.columns = input_output_cols
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will lead to problems with the order if you have multiple input columns with multiple aggregations each.
Imagine you have an list of aggregations something like this:

aggregations = [
    ("A", "output_1", "sum"),
    ("B", "output_2", "sum"),
    ("A", "output_3", "mean"),
    ("B", "output_4", "mean"),
]

After your code, the dict will contain {'A': ['sum', 'mean'], 'B': ['sum', 'mean']} (which is expected) and the list of columns will be [('A', 'output_1'), ('B', 'output_2'), ('A', 'output_3'), ('B', 'output_4')] - as the order will be exactly as in the aggregations list.
The problem however is, that the dict does not have this order and the grouping will be ordered by input column:

df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [1, 2, 3, 4], "id": [1, 1, 2, 2]})
result = df.groupby("id").agg(aggregations_dict)
result.columns.to_list()

gives [('A', 'sum'), ('A', 'mean'), ('B', 'sum'), ('B', 'mean')] (for dask it will be similar). If you now apply the column names from your input_output_cols, we will map the wrong columns (and this is also why the test in the CI is failing).

I can totally understand why you implemented the change and I definitely also do not want to use a deprecated function, but solving this differently might be hard. We would probably need to use ordereddicts (to make sure we understand the order of the input columns) and then build a new list based on input column and aggregation list by ourselves.

One question though: is it also deprecated for dask or only for pandas? Because I still see it mentioned in the dask function which handles this without any warning.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay thanks, I see the issue! Using a dict-of-dicts is still supported in Dask, although it's not supported in cuDF or Dask cuDF, which is what motivated the update. If it's alright with you, I'll look into the ordereddicts approach and get back to you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right - the OrderedDict approach is turning out to be more complicated than desired. We're currently looking into supporting dict-of-dicts support on the Dask cuDF side now. If it's okay, I'm going to keep this PR open for now as we're running additional tests with it, but I think eventually I'll remove the changes from aggregate.py.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, fine with me. Take your time!

@nils-braun
Copy link
Collaborator

Thanks @sarahyurick for the change and for providing the code for your issues right away!

@sarahyurick sarahyurick mentioned this pull request Aug 13, 2021
@sarahyurick
Copy link
Collaborator Author

UPDATE: Moved join.py changes to #211, so hopefully that can be merged easily.

@sarahyurick sarahyurick changed the title Updates to aggregate.py and join.py Update aggregate.py Aug 13, 2021
@nils-braun nils-braun mentioned this pull request Aug 14, 2021
@sarahyurick
Copy link
Collaborator Author

hi @randerzander, I'm closing this PR now that rapidsai/cudf#9054 has merged and should fix the dict-of-dicts issue. groupby aggregations should be fully functional now!

And thanks Nils for your inputs on this.

@nils-braun
Copy link
Collaborator

That is great to hear!

@sarahyurick sarahyurick deleted the update_aggregate branch September 21, 2022 23:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants