Skip to content

Commit 0372ebc

Browse files
[Review] Refactor ConfigContainer to use dask config (#392)
* Add dask config module to dask-sql * Update context to use dask-sql config instead of ConfigContainer * Remove distributed utils_test fixtures and add client fixture * Reduce connection timeout for non reachable test * Rerun tests * Mount tempfile directory in independent worker container * Skip test_fsql on external cluster * Relax external cluster's conda packages * Add fixme note to failing fugue test due to missing triad module * Update case sensitivity test * Update setup to include the config yaml file for dask-sql * Add sql schema yaml and update setup to include the schema * Remove explicit config dict from dask_sql.config * Update set_config docstring and prevent setting non sql configs * Remove configContainer in favor of dask-sql config module * Add config unit tests * Add dask sql configuration docs and include dask-sphinx-theme ext for rendering config yaml files * Fix dask-sphinx-theme version constraint for config extention * Update set_config docstring * Rerun tests * Remove context.set_config api in favor of directly using dask Co-authored-by: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com>
1 parent cd38818 commit 0372ebc

16 files changed

Lines changed: 212 additions & 196 deletions

File tree

MANIFEST.in

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
1+
recursive-include dask_sql *.yaml
2+
13
include versioneer.py
24
include dask_sql/_version.py

dask_sql/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from . import config
12
from ._version import get_versions
23
from .cmd import cmd_loop
34
from .context import Context

dask_sql/config.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import os
2+
3+
import dask
4+
import yaml
5+
6+
fn = os.path.join(os.path.dirname(__file__), "sql.yaml")
7+
8+
with open(fn) as f:
9+
defaults = yaml.safe_load(f)
10+
11+
dask.config.update_defaults(defaults)
12+
dask.config.ensure_file(source=fn, comment=True)

dask_sql/context.py

Lines changed: 27 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import dask.dataframe as dd
88
import pandas as pd
9+
from dask import config as dask_config
910
from dask.base import optimize
1011
from dask.distributed import Client
1112

@@ -421,6 +422,7 @@ def sql(
421422
return_futures: bool = True,
422423
dataframes: Dict[str, Union[dd.DataFrame, pd.DataFrame]] = None,
423424
gpu: bool = False,
425+
config_options: Dict[str, Any] = None,
424426
) -> Union[dd.DataFrame, pd.DataFrame]:
425427
"""
426428
Query the registered tables with the given SQL.
@@ -448,36 +450,39 @@ def sql(
448450
to register before executing this query
449451
gpu (:obj:`bool`): Whether or not to load the additional Dask or pandas dataframes (if any) on GPU;
450452
requires cuDF / dask-cuDF if enabled. Defaults to False.
453+
config_options (:obj:`Dict[str,Any]`): Specific configuration options to pass during
454+
query execution
451455
452456
Returns:
453457
:obj:`dask.dataframe.DataFrame`: the created data frame of this query.
454458
455459
"""
456-
if dataframes is not None:
457-
for df_name, df in dataframes.items():
458-
self.create_table(df_name, df, gpu=gpu)
460+
with dask_config.set(config_options):
461+
if dataframes is not None:
462+
for df_name, df in dataframes.items():
463+
self.create_table(df_name, df, gpu=gpu)
459464

460-
rel, select_names, _ = self._get_ral(sql)
465+
rel, select_names, _ = self._get_ral(sql)
461466

462-
dc = RelConverter.convert(rel, context=self)
467+
dc = RelConverter.convert(rel, context=self)
463468

464-
if dc is None:
465-
return
469+
if dc is None:
470+
return
466471

467-
if select_names:
468-
# Rename any columns named EXPR$* to a more human readable name
469-
cc = dc.column_container
470-
cc = cc.rename(
471-
{
472-
df_col: select_name
473-
for df_col, select_name in zip(cc.columns, select_names)
474-
}
475-
)
476-
dc = DataContainer(dc.df, cc)
472+
if select_names:
473+
# Rename any columns named EXPR$* to a more human readable name
474+
cc = dc.column_container
475+
cc = cc.rename(
476+
{
477+
df_col: select_name
478+
for df_col, select_name in zip(cc.columns, select_names)
479+
}
480+
)
481+
dc = DataContainer(dc.df, cc)
477482

478-
df = dc.assign()
479-
if not return_futures:
480-
df = df.compute()
483+
df = dc.assign()
484+
if not return_futures:
485+
df = df.compute()
481486

482487
return df
483488

@@ -588,71 +593,6 @@ def register_model(
588593
schema_name = schema_name or self.schema_name
589594
self.schema[schema_name].models[model_name.lower()] = (model, training_columns)
590595

591-
def set_config(
592-
self,
593-
config_options: Union[Tuple[str, Any], Dict[str, Any]],
594-
schema_name: str = None,
595-
):
596-
"""
597-
Add configuration options to a schema.
598-
A configuration option could be used to set the behavior of certain configurirable operations.
599-
600-
Eg: `dask.groupby.agg.split_out` can be used to split the output of a groupby agrregation to multiple partitions.
601-
602-
Args:
603-
config_options (:obj:`Tuple[str,val]` or :obj:`Dict[str,val]`): config_option and value to set
604-
schema_name (:obj:`str`): Optionally select schema for setting configs
605-
606-
Example:
607-
.. code-block:: python
608-
609-
from dask_sql import Context
610-
611-
c = Context()
612-
c.set_config(("dask.groupby.aggregate.split_out", 1))
613-
c.set_config(
614-
{
615-
"dask.groupby.aggregate.split_out": 2,
616-
"dask.groupby.aggregate.split_every": 4,
617-
}
618-
)
619-
620-
"""
621-
schema_name = schema_name or self.schema_name
622-
self.schema[schema_name].config.set_config(config_options)
623-
624-
def drop_config(
625-
self, config_strs: Union[str, List[str]], schema_name: str = None,
626-
):
627-
"""
628-
Drop user set configuration options from schema
629-
630-
Args:
631-
config_strs (:obj:`str` or :obj:`List[str]`): config key or keys to drop
632-
schema_name (:obj:`str`): Optionally select schema for dropping configs
633-
634-
Example:
635-
.. code-block:: python
636-
637-
from dask_sql import Context
638-
639-
c = Context()
640-
c.set_config(
641-
{
642-
"dask.groupby.aggregate.split_out": 2,
643-
"dask.groupby.aggregate.split_every": 4,
644-
}
645-
)
646-
c.drop_config(
647-
[
648-
"dask.groupby.aggregate.split_out",
649-
"dask.groupby.aggregate.split_every",
650-
]
651-
)
652-
"""
653-
schema_name = schema_name or self.schema_name
654-
self.schema[schema_name].config.drop_config(config_strs)
655-
656596
def ipython_magic(self, auto_include=False): # pragma: no cover
657597
"""
658598
Register a new ipython/jupyter magic function "sql"
@@ -730,7 +670,7 @@ def run_server(
730670

731671
def stop_server(self): # pragma: no cover
732672
"""
733-
Stop a SQL server started by ``run_server`.
673+
Stop a SQL server started by ``run_server``.
734674
"""
735675
if self.sql_server is not None:
736676
loop = asyncio.get_event_loop()
@@ -848,11 +788,7 @@ def _get_ral(self, sql):
848788
)
849789

850790
# True if the SQL query should be case sensitive and False otherwise
851-
case_sensitive = (
852-
self.schema[self.schema_name]
853-
.config.get_config_by_prefix("dask.sql.identifier.case.sensitive")
854-
.get("dask.sql.identifier.case.sensitive", True)
855-
)
791+
case_sensitive = dask_config.get("sql.identifier.case_sensitive", default=True)
856792

857793
generator_builder = RelationalAlgebraGeneratorBuilder(
858794
self.schema_name, case_sensitive, java.util.ArrayList()

dask_sql/datacontainer.py

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -255,88 +255,3 @@ def __init__(self, name: str):
255255
self.models: Dict[str, Tuple[Any, List[str]]] = {}
256256
self.functions: Dict[str, UDF] = {}
257257
self.function_lists: List[FunctionDescription] = []
258-
self.config: ConfigContainer = ConfigContainer()
259-
260-
261-
class ConfigContainer:
262-
"""
263-
Helper class that contains configuration options required for specific operations
264-
Configurations are stored in a dictionary where keys strings are delimited by `.`
265-
for easier nested access of multiple configurations
266-
Example:
267-
Dask groupby aggregate operations can be configured via with the `split_out` option
268-
to determine number of output partitions or the `split_every` option to determine
269-
the number of partitions used during the groupby tree reduction step.
270-
"""
271-
272-
def __init__(self):
273-
self.config_dict = {
274-
# Do not set defaults here unless needed
275-
# This mantains the list of configuration options supported that can be set
276-
# "dask.groupby.aggregate.split_out": 1,
277-
# "dask.groupby.aggregate.split_every": None,
278-
}
279-
280-
def set_config(self, config_options: Union[Tuple[str, Any], Dict[str, Any]]):
281-
"""
282-
Accepts either a tuple of (config, val) or a dictionary containing multiple
283-
{config1: val1, config2: val2} pairs and updates the schema config with these values
284-
"""
285-
if isinstance(config_options, tuple):
286-
config_options = [config_options]
287-
self.config_dict.update(config_options)
288-
289-
def drop_config(self, config_strs: Union[str, List[str]]):
290-
if isinstance(config_strs, str):
291-
config_strs = [config_strs]
292-
for config_key in config_strs:
293-
self.config_dict.pop(config_key)
294-
295-
def get_config_by_prefix(self, config_prefix: str):
296-
"""
297-
Returns all configuration options matching the prefix in `config_prefix`
298-
299-
Example:
300-
.. code-block:: python
301-
302-
from dask_sql.datacontainer import ConfigContainer
303-
304-
sql_config = ConfigContainer()
305-
sql_config.set_config(
306-
{
307-
"dask.groupby.aggregate.split_out":1,
308-
"dask.groupby.aggregate.split_every": 1,
309-
"dask.sort.persist": True,
310-
}
311-
)
312-
313-
sql_config.get_config_by_prefix("dask.groupby")
314-
# Returns {
315-
# "dask.groupby.aggregate.split_out": 1,
316-
# "dask.groupby.aggregate.split_every": 1
317-
# }
318-
319-
sql_config.get_config_by_prefix("dask")
320-
# Returns {
321-
# "dask.groupby.aggregate.split_out": 1,
322-
# "dask.groupby.aggregate.split_every": 1,
323-
# "dask.sort.persist": True
324-
# }
325-
326-
sql_config.get_config_by_prefix("dask.sort")
327-
# Returns {"dask.sort.persist": True}
328-
329-
sql_config.get_config_by_prefix("missing.key")
330-
sql_config.get_config_by_prefix(None)
331-
# Both return {}
332-
333-
"""
334-
return (
335-
{
336-
key: val
337-
for key, val in self.config_dict.items()
338-
if key.startswith(config_prefix)
339-
}
340-
if config_prefix
341-
else {}
342-
)

dask_sql/physical/rel/logical/aggregate.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import dask.dataframe as dd
88
import pandas as pd
9+
from dask import config as dask_config
910

1011
try:
1112
import dask_cudf
@@ -231,15 +232,8 @@ def _do_aggregations(
231232
for col in group_columns:
232233
collected_aggregations[None].append((col, col, "first"))
233234

234-
groupby_agg_options = context.schema[
235-
context.schema_name
236-
].config.get_config_by_prefix("dask.groupby.aggregate")
237-
# Update the config string to only include the actual param value
238-
# i.e. dask.groupby.aggregate.split_out -> split_out
239-
for config_key in list(groupby_agg_options.keys()):
240-
groupby_agg_options[
241-
config_key.rpartition(".")[2]
242-
] = groupby_agg_options.pop(config_key)
235+
groupby_agg_options = dask_config.get("sql.groupby")
236+
243237
# Now we can go ahead and use these grouped aggregations
244238
# to perform the actual aggregation
245239
# It is very important to start with the non-filtered entry.

dask_sql/sql-schema.yaml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
properties:
2+
3+
sql:
4+
type: object
5+
properties:
6+
7+
groupby:
8+
type: object
9+
properties:
10+
11+
split_out:
12+
type: integer
13+
description: |
14+
Number of output partitions for a groupby operation
15+
16+
split_every:
17+
type: [integer, "null"]
18+
description: |
19+
Number of branches per reduction step for a groupby operation.
20+
21+
identifier:
22+
type: object
23+
properties:
24+
25+
case_sensitive:
26+
type: boolean
27+
description: |
28+
Whether sql identifiers are considered case sensitive while parsing.

dask_sql/sql.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
sql:
2+
groupby:
3+
split_out: 1
4+
split_every: null
5+
6+
identifier:
7+
case_sensitive: True

docs/conf.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,11 @@
2929
# Add any Sphinx extension module names here, as strings. They can be
3030
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
3131
# ones.
32-
extensions = ["sphinx.ext.autodoc", "sphinx.ext.napoleon"]
32+
extensions = [
33+
"sphinx.ext.autodoc",
34+
"sphinx.ext.napoleon",
35+
"dask_sphinx_theme.ext.dask_config_sphinx_ext",
36+
]
3337

3438
# Add any paths that contain templates here, relative to this directory.
3539
templates_path = ["_templates"]

docs/environment.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ dependencies:
2323
- cytoolz=0.11.0=py38h1e0a361_0
2424
- dask=2.28.0=py_0
2525
- dask-core=2.28.0=py_0
26-
- dask-sphinx-theme=1.3.2=pyh9f0ad1d_0
26+
- dask-sphinx-theme>=2.0.3
2727
- distributed=2.28.0=py38h32f6830_0
2828
- docutils=0.16=py38h32f6830_1
2929
- fontconfig=2.13.1=h1056068_1002

0 commit comments

Comments
 (0)