Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,71 @@ def register_model(
schema_name = schema_name or self.schema_name
self.schema[schema_name].models[model_name.lower()] = (model, training_columns)

def set_config(
self,
config_options: Union[Tuple[str, Any], Dict[str, Any]],
schema_name: str = None,
):
"""
Add configuration options to a schema.
A configuration option could be used to set the behavior of certain configurirable operations.

Eg: `dask.groupby.agg.split_out` can be used to split the output of a groupby agrregation to multiple partitions.

Args:
config_options (:obj:`Tuple[str,val]` or :obj:`Dict[str,val]`): config_option and value to set
schema_name (:obj:`str`): Optionally select schema for setting configs

Example:
.. code-block:: python

from dask_sql import Context

c = Context()
c.set_config(("dask.groupby.aggregate.split_out", 1))
c.set_config(
{
"dask.groupby.aggregate.split_out": 2,
"dask.groupby.aggregate.split_every": 4,
}
)

"""
schema_name = schema_name or self.schema_name
self.schema[schema_name].config.set_config(config_options)

def drop_config(
self, config_strs: Union[str, List[str]], schema_name: str = None,
):
"""
Drop user set configuration options from schema

Args:
config_strs (:obj:`str` or :obj:`List[str]`): config key or keys to drop
schema_name (:obj:`str`): Optionally select schema for dropping configs

Example:
.. code-block:: python

from dask_sql import Context

c = Context()
c.set_config(
{
"dask.groupby.aggregate.split_out": 2,
"dask.groupby.aggregate.split_every": 4,
}
)
c.drop_config(
[
"dask.groupby.aggregate.split_out",
"dask.groupby.aggregate.split_every",
]
)
"""
schema_name = schema_name or self.schema_name
self.schema[schema_name].config.drop_config(config_strs)

def ipython_magic(self, auto_include=False): # pragma: no cover
"""
Register a new ipython/jupyter magic function "sql"
Expand Down
85 changes: 85 additions & 0 deletions dask_sql/datacontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,88 @@ def __init__(self, name: str):
self.models: Dict[str, Tuple[Any, List[str]]] = {}
self.functions: Dict[str, UDF] = {}
self.function_lists: List[FunctionDescription] = []
self.config: ConfigContainer = ConfigContainer()


class ConfigContainer:
"""
Helper class that contains configuration options required for specific operations
Configurations are stored in a dictionary where keys strings are delimited by `.`
for easier nested access of multiple configurations
Example:
Dask groupby aggregate operations can be configured via with the `split_out` option
to determine number of output partitions or the `split_every` option to determine
the number of partitions used during the groupby tree reduction step.
"""

def __init__(self):
self.config_dict = {
# Do not set defaults here unless needed
# This mantains the list of configuration options supported that can be set
# "dask.groupby.aggregate.split_out": 1,
# "dask.groupby.aggregate.split_every": None,
}

def set_config(self, config_options: Union[Tuple[str, Any], Dict[str, Any]]):
"""
Accepts either a tuple of (config, val) or a dictionary containing multiple
{config1: val1, config2: val2} pairs and updates the schema config with these values
"""
if isinstance(config_options, tuple):
config_options = [config_options]
self.config_dict.update(config_options)

def drop_config(self, config_strs: Union[str, List[str]]):
if isinstance(config_strs, str):
config_strs = [config_strs]
for config_key in config_strs:
self.config_dict.pop(config_key)

def get_config_by_prefix(self, config_prefix: str):
"""
Returns all configuration options matching the prefix in `config_prefix`

Example:
.. code-block:: python

from dask_sql.datacontainer import ConfigContainer

sql_config = ConfigContainer()
sql_config.set_config(
{
"dask.groupby.aggregate.split_out":1,
"dask.groupby.aggregate.split_every": 1,
"dask.sort.persist": True,
}
)

sql_config.get_config_by_prefix("dask.groupby")
# Returns {
# "dask.groupby.aggregate.split_out": 1,
# "dask.groupby.aggregate.split_every": 1
# }

sql_config.get_config_by_prefix("dask")
# Returns {
# "dask.groupby.aggregate.split_out": 1,
# "dask.groupby.aggregate.split_every": 1,
# "dask.sort.persist": True
# }

sql_config.get_config_by_prefix("dask.sort")
# Returns {"dask.sort.persist": True}

sql_config.get_config_by_prefix("missing.key")
sql_config.get_config_by_prefix(None)
# Both return {}

"""
return (
{
key: val
for key, val in self.config_dict.items()
if key.startswith(config_prefix)
Comment on lines +311 to +313
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not be the most efficient way to do this, but should be okay for small dictionaries.

}
if config_prefix
else {}
)
26 changes: 23 additions & 3 deletions dask_sql/physical/rel/logical/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ def _do_aggregations(
for col in group_columns:
collected_aggregations[None].append((col, col, "first"))

groupby_agg_options = context.schema[
context.schema_name
Comment on lines +234 to +235
Copy link
Collaborator Author

@ayushdg ayushdg Nov 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was giving this a bit more thought and it seems like context.schema_name always returns the default schema, and the context.fqn method can help return the specific schema being used in that query. I'm not completely sure if the schema for groupby is associated with the groupby_call itself or with specific aggregations similar based on the code here:

schema_name, aggregation_name = context.fqn(
expr.getAggregation().getNameAsId()
)

Perhaps @rajagurunath or someone more familiar with this part of the codebase can provide insight here.

].config.get_config_by_prefix("dask.groupby.aggregate")
# Update the config string to only include the actual param value
# i.e. dask.groupby.aggregate.split_out -> split_out
for config_key in list(groupby_agg_options.keys()):
groupby_agg_options[
config_key.rpartition(".")[2]
] = groupby_agg_options.pop(config_key)
Comment on lines +239 to +242
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was initially planning on having this logic in ConfigContainer which returns the param name from the config string: i.e dask.groupby.aggregate.split_out -> split_out but the reason things were getting a bit messy is because multiple config options down the line could have the param name which would cause conflicts, eg: dask.dataframe.drop_duplicates.split_out would also return split_out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, would it make sense to consolidate this functionality in get_config_by_prefix? We would filter the config options by prefix and then return those options with the prefix stripped. I can leave suggestions for what I imagine this would look like:

Suggested change
for config_key in list(groupby_agg_options.keys()):
groupby_agg_options[
config_key.rpartition(".")[2]
] = groupby_agg_options.pop(config_key)

# Now we can go ahead and use these grouped aggregations
# to perform the actual aggregation
# It is very important to start with the non-filtered entry.
Expand All @@ -240,13 +249,23 @@ def _do_aggregations(
if key in collected_aggregations:
aggregations = collected_aggregations.pop(key)
df_result = self._perform_aggregation(
df, None, aggregations, additional_column_name, group_columns,
df,
None,
aggregations,
additional_column_name,
group_columns,
groupby_agg_options,
)

# Now we can also the the rest
for filter_column, aggregations in collected_aggregations.items():
agg_result = self._perform_aggregation(
df, filter_column, aggregations, additional_column_name, group_columns,
df,
filter_column,
aggregations,
additional_column_name,
group_columns,
groupby_agg_options,
)

# ... and finally concat the new data with the already present columns
Expand Down Expand Up @@ -358,6 +377,7 @@ def _perform_aggregation(
aggregations: List[Tuple[str, str, Any]],
additional_column_name: str,
group_columns: List[str],
groupby_agg_options: Dict[str, Any] = {},
):
tmp_df = df

Expand All @@ -382,7 +402,7 @@ def _perform_aggregation(

# Now apply the aggregation
logger.debug(f"Performing aggregation {dict(aggregations_dict)}")
agg_result = grouped_df.agg(aggregations_dict)
agg_result = grouped_df.agg(aggregations_dict, **groupby_agg_options)

# ... fix the column names to a single level ...
agg_result.columns = agg_result.columns.get_level_values(-1)
Expand Down
3 changes: 3 additions & 0 deletions dask_sql/physical/utils/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ def get_groupby_with_nulls_cols(
is_null_column = ~(group_column.isnull())
non_nan_group_column = group_column.fillna(0)

# split_out doesn't work if both columns have the same name
is_null_column.name = f"{is_null_column.name}_{new_temporary_column(df)}"

group_columns_and_nulls += [is_null_column, non_nan_group_column]

if not group_columns_and_nulls:
Expand Down
59 changes: 59 additions & 0 deletions tests/integration/test_groupby.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import numpy as np
import pandas as pd
import pytest
from dask import dataframe as dd
from pandas.testing import assert_frame_equal, assert_series_equal


Expand Down Expand Up @@ -345,3 +347,60 @@ def test_stats_aggregation(c, timeseries_df):
check_dtype=False,
check_names=False,
)


@pytest.mark.parametrize(
"input_table",
["user_table_1", pytest.param("gpu_user_table_1", marks=pytest.mark.gpu),],
)
@pytest.mark.parametrize("split_out", [None, 2, 4])
def test_groupby_split_out(c, input_table, split_out, request):
user_table = request.getfixturevalue(input_table)
c.set_config(("dask.groupby.aggregate.split_out", split_out))
df = c.sql(
f"""
SELECT
user_id, SUM(b) AS "S"
FROM {input_table}
GROUP BY user_id
"""
)
expected_df = (
user_table.groupby(by="user_id").agg({"b": "sum"}).reset_index(drop=False)
)
expected_df = expected_df.rename(columns={"b": "S"})
expected_df = expected_df.sort_values("user_id")
assert df.npartitions == split_out if split_out else 1
dd.assert_eq(df.compute().sort_values("user_id"), expected_df, check_index=False)
c.drop_config("dask.groupby.aggregate.split_out")


@pytest.mark.parametrize("gpu", [False, pytest.param(True, marks=pytest.mark.gpu)])
@pytest.mark.parametrize("split_every,expected_keys", [(2, 154), (3, 148), (4, 144)])
def test_groupby_split_every(c, gpu, split_every, expected_keys):
xd = pytest.importorskip("cudf") if gpu else pd
input_ddf = dd.from_pandas(
xd.DataFrame({"user_id": [1, 2, 3, 4] * 16, "b": [5, 6, 7, 8] * 16}),
npartitions=16,
) # Need an input with multiple partitions to demonstrate split_every
c.create_table("split_every_input", input_ddf)
c.set_config(("dask.groupby.aggregate.split_every", split_every))
df = c.sql(
"""
SELECT
user_id, SUM(b) AS "S"
FROM split_every_input
GROUP BY user_id
"""
)
expected_df = (
input_ddf.groupby(by="user_id")
.agg({"b": "sum"}, split_every=split_every)
.reset_index(drop=False)
)
expected_df = expected_df.rename(columns={"b": "S"})
expected_df = expected_df.sort_values("user_id")
assert len(df.dask.keys()) == expected_keys
dd.assert_eq(df.compute().sort_values("user_id"), expected_df, check_index=False)
c.drop_config("dask.groupby.aggregate.split_every")
c.drop_table("split_every_input")