Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions dask_sql/physical/rel/logical/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,15 +345,18 @@ def _perform_aggregation(
grouped_df = tmp_df.groupby(by=group_columns_and_nulls)

# Convert into the correct format for dask
aggregations_dict = defaultdict(dict)
aggregations_dict = defaultdict(list)
input_output_cols = []
for aggregation in aggregations:
input_col, output_col, aggregation_f = aggregation

aggregations_dict[input_col][output_col] = aggregation_f
aggregations_dict[input_col].append(aggregation_f)
input_output_cols.append((input_col, output_col))

# Now apply the aggregation
logger.debug(f"Performing aggregation {dict(aggregations_dict)}")
agg_result = grouped_df.agg(aggregations_dict)
agg_result.columns = input_output_cols
Comment on lines +348 to +359
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will lead to problems with the order if you have multiple input columns with multiple aggregations each.
Imagine you have an list of aggregations something like this:

aggregations = [
    ("A", "output_1", "sum"),
    ("B", "output_2", "sum"),
    ("A", "output_3", "mean"),
    ("B", "output_4", "mean"),
]

After your code, the dict will contain {'A': ['sum', 'mean'], 'B': ['sum', 'mean']} (which is expected) and the list of columns will be [('A', 'output_1'), ('B', 'output_2'), ('A', 'output_3'), ('B', 'output_4')] - as the order will be exactly as in the aggregations list.
The problem however is, that the dict does not have this order and the grouping will be ordered by input column:

df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [1, 2, 3, 4], "id": [1, 1, 2, 2]})
result = df.groupby("id").agg(aggregations_dict)
result.columns.to_list()

gives [('A', 'sum'), ('A', 'mean'), ('B', 'sum'), ('B', 'mean')] (for dask it will be similar). If you now apply the column names from your input_output_cols, we will map the wrong columns (and this is also why the test in the CI is failing).

I can totally understand why you implemented the change and I definitely also do not want to use a deprecated function, but solving this differently might be hard. We would probably need to use ordereddicts (to make sure we understand the order of the input columns) and then build a new list based on input column and aggregation list by ourselves.

One question though: is it also deprecated for dask or only for pandas? Because I still see it mentioned in the dask function which handles this without any warning.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay thanks, I see the issue! Using a dict-of-dicts is still supported in Dask, although it's not supported in cuDF or Dask cuDF, which is what motivated the update. If it's alright with you, I'll look into the ordereddicts approach and get back to you?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right - the OrderedDict approach is turning out to be more complicated than desired. We're currently looking into supporting dict-of-dicts support on the Dask cuDF side now. If it's okay, I'm going to keep this PR open for now as we're running additional tests with it, but I think eventually I'll remove the changes from aggregate.py.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, fine with me. Take your time!


# ... fix the column names to a single level ...
agg_result.columns = agg_result.columns.get_level_values(-1)
Expand Down