Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
6097526
Add dask config module to dask-sql
ayushdg Feb 4, 2022
38d2f29
Update context to use dask-sql config instead of ConfigContainer
ayushdg Feb 4, 2022
e842343
Merge branch 'main' of github.com:dask-contrib/dask-sql into enh-dask…
ayushdg Feb 11, 2022
7f530bb
Remove distributed utils_test fixtures and add client fixture
ayushdg Feb 12, 2022
bc03e35
Reduce connection timeout for non reachable test
ayushdg Feb 12, 2022
b86940d
Merge branch 'enh-pytest-client' into enh-dask-sql-config
ayushdg Feb 12, 2022
a818de8
Rerun tests
ayushdg Feb 14, 2022
00edaf4
Merge remote-tracking branch 'upstream/main' into enh-pytest-client
charlesbluca Feb 16, 2022
81563ea
Mount tempfile directory in independent worker container
charlesbluca Feb 16, 2022
a0e479b
Skip test_fsql on external cluster
charlesbluca Feb 16, 2022
2404115
Relax external cluster's conda packages
charlesbluca Feb 16, 2022
d77f061
Add fixme note to failing fugue test due to missing triad module
ayushdg Feb 22, 2022
60185a6
Merge branch 'enh-pytest-client' into enh-dask-sql-config
ayushdg Feb 23, 2022
d11f508
Update case sensitivity test
ayushdg Feb 23, 2022
b8fcbc0
Update setup to include the config yaml file for dask-sql
ayushdg Feb 23, 2022
edb2f17
Merge branch 'main' of github.com:dask-contrib/dask-sql into enh-dask…
ayushdg Feb 23, 2022
42b1e34
Merge remote-tracking branch 'upstream/main' into enh-dask-sql-config
charlesbluca Feb 23, 2022
f05bb3c
Add sql schema yaml and update setup to include the schema
ayushdg Feb 28, 2022
0f2eb02
Remove explicit config dict from dask_sql.config
ayushdg Feb 28, 2022
a370dcd
Update set_config docstring and prevent setting non sql configs
ayushdg Feb 28, 2022
791e6d4
Remove configContainer in favor of dask-sql config module
ayushdg Feb 28, 2022
fed3d9b
Add config unit tests
ayushdg Feb 28, 2022
ae91f7f
Add dask sql configuration docs and include dask-sphinx-theme ext for…
ayushdg Feb 28, 2022
9f2fed3
Merge branch 'enh-dask-sql-config' of github.com:ayushdg/dask-sql int…
ayushdg Feb 28, 2022
da0a8d4
Fix dask-sphinx-theme version constraint for config extention
ayushdg Feb 28, 2022
e5bb08b
Update set_config docstring
ayushdg Feb 28, 2022
f68bdcf
Rerun tests
ayushdg Feb 28, 2022
0322f62
Merge remote-tracking branch 'upstream/main' into enh-dask-sql-config
charlesbluca Mar 1, 2022
c980c3e
Remove context.set_config api in favor of directly using dask
ayushdg Mar 7, 2022
58eada8
Merge branch 'enh-dask-sql-config' of github.com:ayushdg/dask-sql int…
ayushdg Mar 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
recursive-include dask_sql *.yaml

include versioneer.py
include dask_sql/_version.py
1 change: 1 addition & 0 deletions dask_sql/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import config
from ._version import get_versions
from .cmd import cmd_loop
from .context import Context
Expand Down
12 changes: 12 additions & 0 deletions dask_sql/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import os

import dask
import yaml

fn = os.path.join(os.path.dirname(__file__), "sql.yaml")

with open(fn) as f:
defaults = yaml.safe_load(f)

dask.config.update_defaults(defaults)
dask.config.ensure_file(source=fn, comment=True)
118 changes: 27 additions & 91 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import dask.dataframe as dd
import pandas as pd
from dask import config as dask_config
from dask.base import optimize
from dask.distributed import Client

Expand Down Expand Up @@ -419,6 +420,7 @@ def sql(
return_futures: bool = True,
dataframes: Dict[str, Union[dd.DataFrame, pd.DataFrame]] = None,
gpu: bool = False,
config_options: Dict[str, Any] = None,
) -> Union[dd.DataFrame, pd.DataFrame]:
"""
Query the registered tables with the given SQL.
Expand Down Expand Up @@ -446,36 +448,39 @@ def sql(
to register before executing this query
gpu (:obj:`bool`): Whether or not to load the additional Dask or pandas dataframes (if any) on GPU;
requires cuDF / dask-cuDF if enabled. Defaults to False.
config_options (:obj:`Dict[str,Any]`): Specific configuration options to pass during
query execution

Returns:
:obj:`dask.dataframe.DataFrame`: the created data frame of this query.

"""
if dataframes is not None:
for df_name, df in dataframes.items():
self.create_table(df_name, df, gpu=gpu)
with dask_config.set(config_options):
if dataframes is not None:
for df_name, df in dataframes.items():
self.create_table(df_name, df, gpu=gpu)

rel, select_names, _ = self._get_ral(sql)
rel, select_names, _ = self._get_ral(sql)

dc = RelConverter.convert(rel, context=self)
dc = RelConverter.convert(rel, context=self)

if dc is None:
return
if dc is None:
return

if select_names:
# Rename any columns named EXPR$* to a more human readable name
cc = dc.column_container
cc = cc.rename(
{
df_col: select_name
for df_col, select_name in zip(cc.columns, select_names)
}
)
dc = DataContainer(dc.df, cc)
if select_names:
# Rename any columns named EXPR$* to a more human readable name
cc = dc.column_container
cc = cc.rename(
{
df_col: select_name
for df_col, select_name in zip(cc.columns, select_names)
}
)
dc = DataContainer(dc.df, cc)

df = dc.assign()
if not return_futures:
df = df.compute()
df = dc.assign()
if not return_futures:
df = df.compute()

return df

Expand Down Expand Up @@ -586,71 +591,6 @@ def register_model(
schema_name = schema_name or self.schema_name
self.schema[schema_name].models[model_name.lower()] = (model, training_columns)

def set_config(
self,
config_options: Union[Tuple[str, Any], Dict[str, Any]],
schema_name: str = None,
):
"""
Add configuration options to a schema.
A configuration option could be used to set the behavior of certain configurirable operations.

Eg: `dask.groupby.agg.split_out` can be used to split the output of a groupby agrregation to multiple partitions.

Args:
config_options (:obj:`Tuple[str,val]` or :obj:`Dict[str,val]`): config_option and value to set
schema_name (:obj:`str`): Optionally select schema for setting configs

Example:
.. code-block:: python

from dask_sql import Context

c = Context()
c.set_config(("dask.groupby.aggregate.split_out", 1))
c.set_config(
{
"dask.groupby.aggregate.split_out": 2,
"dask.groupby.aggregate.split_every": 4,
}
)

"""
schema_name = schema_name or self.schema_name
self.schema[schema_name].config.set_config(config_options)

def drop_config(
self, config_strs: Union[str, List[str]], schema_name: str = None,
):
"""
Drop user set configuration options from schema

Args:
config_strs (:obj:`str` or :obj:`List[str]`): config key or keys to drop
schema_name (:obj:`str`): Optionally select schema for dropping configs

Example:
.. code-block:: python

from dask_sql import Context

c = Context()
c.set_config(
{
"dask.groupby.aggregate.split_out": 2,
"dask.groupby.aggregate.split_every": 4,
}
)
c.drop_config(
[
"dask.groupby.aggregate.split_out",
"dask.groupby.aggregate.split_every",
]
)
"""
schema_name = schema_name or self.schema_name
self.schema[schema_name].config.drop_config(config_strs)

def ipython_magic(self, auto_include=False): # pragma: no cover
"""
Register a new ipython/jupyter magic function "sql"
Expand Down Expand Up @@ -728,7 +668,7 @@ def run_server(

def stop_server(self): # pragma: no cover
"""
Stop a SQL server started by ``run_server`.
Stop a SQL server started by ``run_server``.
"""
if self.sql_server is not None:
loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -846,11 +786,7 @@ def _get_ral(self, sql):
)

# True if the SQL query should be case sensitive and False otherwise
case_sensitive = (
self.schema[self.schema_name]
.config.get_config_by_prefix("dask.sql.identifier.case.sensitive")
.get("dask.sql.identifier.case.sensitive", True)
)
case_sensitive = dask_config.get("sql.identifier.case_sensitive", default=True)

generator_builder = RelationalAlgebraGeneratorBuilder(
self.schema_name, case_sensitive, java.util.ArrayList()
Expand Down
85 changes: 0 additions & 85 deletions dask_sql/datacontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,88 +251,3 @@ def __init__(self, name: str):
self.models: Dict[str, Tuple[Any, List[str]]] = {}
self.functions: Dict[str, UDF] = {}
self.function_lists: List[FunctionDescription] = []
self.config: ConfigContainer = ConfigContainer()


class ConfigContainer:
"""
Helper class that contains configuration options required for specific operations
Configurations are stored in a dictionary where keys strings are delimited by `.`
for easier nested access of multiple configurations
Example:
Dask groupby aggregate operations can be configured via with the `split_out` option
to determine number of output partitions or the `split_every` option to determine
the number of partitions used during the groupby tree reduction step.
"""

def __init__(self):
self.config_dict = {
# Do not set defaults here unless needed
# This mantains the list of configuration options supported that can be set
# "dask.groupby.aggregate.split_out": 1,
# "dask.groupby.aggregate.split_every": None,
}

def set_config(self, config_options: Union[Tuple[str, Any], Dict[str, Any]]):
"""
Accepts either a tuple of (config, val) or a dictionary containing multiple
{config1: val1, config2: val2} pairs and updates the schema config with these values
"""
if isinstance(config_options, tuple):
config_options = [config_options]
self.config_dict.update(config_options)

def drop_config(self, config_strs: Union[str, List[str]]):
if isinstance(config_strs, str):
config_strs = [config_strs]
for config_key in config_strs:
self.config_dict.pop(config_key)

def get_config_by_prefix(self, config_prefix: str):
"""
Returns all configuration options matching the prefix in `config_prefix`

Example:
.. code-block:: python

from dask_sql.datacontainer import ConfigContainer

sql_config = ConfigContainer()
sql_config.set_config(
{
"dask.groupby.aggregate.split_out":1,
"dask.groupby.aggregate.split_every": 1,
"dask.sort.persist": True,
}
)

sql_config.get_config_by_prefix("dask.groupby")
# Returns {
# "dask.groupby.aggregate.split_out": 1,
# "dask.groupby.aggregate.split_every": 1
# }

sql_config.get_config_by_prefix("dask")
# Returns {
# "dask.groupby.aggregate.split_out": 1,
# "dask.groupby.aggregate.split_every": 1,
# "dask.sort.persist": True
# }

sql_config.get_config_by_prefix("dask.sort")
# Returns {"dask.sort.persist": True}

sql_config.get_config_by_prefix("missing.key")
sql_config.get_config_by_prefix(None)
# Both return {}

"""
return (
{
key: val
for key, val in self.config_dict.items()
if key.startswith(config_prefix)
}
if config_prefix
else {}
)
12 changes: 3 additions & 9 deletions dask_sql/physical/rel/logical/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import dask.dataframe as dd
import pandas as pd
from dask import config as dask_config

try:
import dask_cudf
Expand Down Expand Up @@ -231,15 +232,8 @@ def _do_aggregations(
for col in group_columns:
collected_aggregations[None].append((col, col, "first"))

groupby_agg_options = context.schema[
context.schema_name
].config.get_config_by_prefix("dask.groupby.aggregate")
# Update the config string to only include the actual param value
# i.e. dask.groupby.aggregate.split_out -> split_out
for config_key in list(groupby_agg_options.keys()):
groupby_agg_options[
config_key.rpartition(".")[2]
] = groupby_agg_options.pop(config_key)
groupby_agg_options = dask_config.get("sql.groupby")

# Now we can go ahead and use these grouped aggregations
# to perform the actual aggregation
# It is very important to start with the non-filtered entry.
Expand Down
28 changes: 28 additions & 0 deletions dask_sql/sql-schema.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
properties:

sql:
type: object
properties:

groupby:
type: object
properties:

split_out:
type: integer
description: |
Number of output partitions for a groupby operation

split_every:
type: [integer, "null"]
description: |
Number of branches per reduction step for a groupby operation.

identifier:
type: object
properties:

case_sensitive:
type: boolean
description: |
Whether sql identifiers are considered case sensitive while parsing.
7 changes: 7 additions & 0 deletions dask_sql/sql.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
sql:
groupby:
split_out: 1
split_every: null

identifier:
case_sensitive: True
6 changes: 5 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ["sphinx.ext.autodoc", "sphinx.ext.napoleon"]
extensions = [
"sphinx.ext.autodoc",
"sphinx.ext.napoleon",
"dask_sphinx_theme.ext.dask_config_sphinx_ext",
]

# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]
Expand Down
2 changes: 1 addition & 1 deletion docs/environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies:
- cytoolz=0.11.0=py38h1e0a361_0
- dask=2.28.0=py_0
- dask-core=2.28.0=py_0
- dask-sphinx-theme=1.3.2=pyh9f0ad1d_0
- dask-sphinx-theme>=2.0.3
- distributed=2.28.0=py38h32f6830_0
- docutils=0.16=py38h32f6830_1
- fontconfig=2.13.1=h1056068_1002
Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Any pandas or dask dataframe can be used as input and ``dask-sql`` understands a
pages/server
pages/cmd
pages/how_does_it_work
pages/configuration


.. note::
Expand Down
18 changes: 18 additions & 0 deletions docs/pages/configuration.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
.. _configuration:

Configuration in Dask-SQL
==========================

``dask-sql`` supports a list of configuration options to configure behavior of certain operations.
``dask-sql`` uses `Dask's config <https://docs.dask.org/en/stable/configuration.html>`_
module and configuration options can be specified with YAML files, via environment variables,
or directly, either through the `dask.config.set <https://docs.dask.org/en/stable/configuration.html#dask.config.set>`_ method
or the ``config_options`` argument in the :func:`dask_sql.Context.sql` method.

Configuration Reference
-----------------------

.. dask-config-block::
:location: sql
:config: https://gist.githubusercontent.com/ayushdg/1b0f7cacd0e9db20175669a17386a58d/raw/6ddb78a3b3c4ac5051aa17105e576211d0e32f6b/sql.yaml
:schema: https://gist.githubusercontent.com/ayushdg/1b0f7cacd0e9db20175669a17386a58d/raw/2d37f64c48c2b6ebdca6634b4c5e3c22a59e1cdf/sql-schema.yaml
Loading