Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7c66b6f
create optimizer_config
sarahyurick Jul 7, 2023
6f460fa
add join_reorder configs
sarahyurick Jul 10, 2023
ccb8d42
add verbose_optimizer
sarahyurick Jul 10, 2023
16b7a8f
add test_dynamic_partition_pruning
sarahyurick Jul 10, 2023
28a5d7f
skip 3.8 tests
sarahyurick Jul 10, 2023
f3e9f52
remove dpp default
sarahyurick Jul 11, 2023
fa4976b
style
sarahyurick Jul 11, 2023
c9c5a10
temporarily remove rust changes
sarahyurick Aug 17, 2023
e527d83
temp remove sql.rs changes
sarahyurick Aug 17, 2023
3fcf740
Merge branch 'main' into optimizer_improvements
sarahyurick Aug 17, 2023
35c363b
readd rust changes
sarahyurick Aug 17, 2023
c1a19e2
check optimized_plan is Ok
sarahyurick Aug 17, 2023
74a222e
Merge branch 'main' into optimizer_improvements
sarahyurick Aug 30, 2023
4a06278
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 7, 2023
f2da571
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 13, 2023
74f04b5
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 13, 2023
bdb287c
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 14, 2023
67d425d
sql.dynamic_partition_pruning.verbose
sarahyurick Nov 15, 2023
4e8bb93
edit yaml style
sarahyurick Nov 15, 2023
33b36c6
edit
sarahyurick Nov 15, 2023
53dea1b
sql.optimizer.verbose
sarahyurick Nov 15, 2023
d3826d9
lowercase
sarahyurick Nov 15, 2023
ff62770
set object type
sarahyurick Nov 15, 2023
24c4c8f
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 28, 2023
2caaa9b
Merge branch 'main' into optimizer_improvements
sarahyurick Dec 15, 2023
3f3d401
Merge branch 'main' into optimizer_improvements
sarahyurick Jan 9, 2024
371103d
Update test_config.py
sarahyurick Jan 10, 2024
c3942ad
update version
sarahyurick Jan 11, 2024
6c995c2
Merge branch 'main' into optimizer_improvements
sarahyurick Jan 23, 2024
4ed7416
Merge remote-tracking branch 'upstream/main' into pr/sarahyurick/1199
charlesbluca Jan 31, 2024
9d0a613
Unpin dask/distributed for development
charlesbluca Jan 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion continuous_integration/docker/conda.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
python>=3.9
dask>=2022.3.0,<=2024.1.1
dask>=2022.3.0
pandas>=1.4.0
jpype1>=1.0.2
openjdk>=8
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/docker/main.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ RUN mamba install -y \
# build requirements
"maturin>=1.3,<1.4" \
# core dependencies
"dask>=2022.3.0,<=2024.1.1" \
"dask>=2022.3.0" \
"pandas>=1.4.0" \
"fastapi>=0.92.0" \
"httpx>=0.24.1" \
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
- c-compiler
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.11.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
- c-compiler
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ channels:
- conda-forge
dependencies:
- c-compiler
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ channels:
dependencies:
- c-compiler
- zlib
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/environment-3.9.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ channels:
dependencies:
- c-compiler
- zlib
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- fastapi>=0.92.0
- fugue>=0.7.3
- httpx>=0.24.1
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ requirements:
- xz # [linux64]
run:
- python
- dask >=2022.3.0,<=2024.1.1
- dask >=2022.3.0
- pandas >=1.4.0
- fastapi >=0.92.0
- httpx >=0.24.1
Expand Down
33 changes: 26 additions & 7 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dask_sql._datafusion_lib import (
DaskSchema,
DaskSQLContext,
DaskSQLOptimizerConfig,
DaskTable,
DFOptimizationException,
DFParsingException,
Expand Down Expand Up @@ -98,13 +99,20 @@ def __init__(self, logging_level=logging.INFO):
# A started SQL server (useful for jupyter notebooks)
self.sql_server = None

# Create the `DaskSQLContext` Rust context
self.context = DaskSQLContext(self.catalog_name, self.schema_name)
self.context.register_schema(self.schema_name, DaskSchema(self.schema_name))
# Create the `DaskSQLOptimizerConfig` Rust context
optimizer_config = DaskSQLOptimizerConfig(
dask_config.get("sql.dynamic_partition_pruning"),
dask_config.get("sql.fact_dimension_ratio"),
dask_config.get("sql.max_fact_tables"),
dask_config.get("sql.preserve_user_order"),
dask_config.get("sql.filter_selectivity"),
)

self.context.apply_dynamic_partition_pruning(
dask_config.get("sql.dynamic_partition_pruning")
# Create the `DaskSQLContext` Rust context
self.context = DaskSQLContext(
self.catalog_name, self.schema_name, optimizer_config
)
self.context.register_schema(self.schema_name, DaskSchema(self.schema_name))

# # Register any default plugins, if nothing was registered before.
RelConverter.add_plugin_class(logical.DaskAggregatePlugin, replace=False)
Expand Down Expand Up @@ -542,11 +550,16 @@ def explain(
:obj:`str`: a description of the created relational algebra.

"""
dynamic_partition_pruning = dask_config.get("sql.dynamic_partition_pruning")
if not dask_config.get("sql.optimizer.verbose"):
dask_config.set({"sql.dynamic_partition_pruning": False})

if dataframes is not None:
for df_name, df in dataframes.items():
self.create_table(df_name, df, gpu=gpu)

_, rel_string = self._get_ral(sql)
dask_config.set({"sql.dynamic_partition_pruning": dynamic_partition_pruning})
return rel_string

def visualize(self, sql: str, filename="mydask.png") -> None: # pragma: no cover
Expand Down Expand Up @@ -799,9 +812,15 @@ def _get_ral(self, sql):
"""Helper function to turn the sql query into a relational algebra and resulting column names"""

logger.debug(f"Entering _get_ral('{sql}')")
self.context.apply_dynamic_partition_pruning(
dask_config.get("sql.dynamic_partition_pruning")

optimizer_config = DaskSQLOptimizerConfig(
dask_config.get("sql.dynamic_partition_pruning"),
dask_config.get("sql.fact_dimension_ratio"),
dask_config.get("sql.max_fact_tables"),
dask_config.get("sql.preserve_user_order"),
dask_config.get("sql.filter_selectivity"),
)
self.context.set_optimizer_config(optimizer_config)

# get the schema of what we currently have registered
schemas = self._prepare_schemas()
Expand Down
36 changes: 36 additions & 0 deletions dask_sql/sql-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,42 @@ properties:
description: |
Whether to apply the dynamic partition pruning optimizer rule.

optimizer:
type: object
properties:

verbose:
type: boolean
description: |
The dynamic partition pruning optimizer rule can sometimes result in extremely long
c.explain() outputs which are not helpful to the user. Setting this option to true
allows the user to see the entire output, while setting it to false truncates the
output. Default is false.

fact_dimension_ratio:
type: [number, "null"]
description: |
Ratio of the size of the dimension tables to fact tables. Parameter for dynamic partition
pruning and join reorder optimizer rules.

max_fact_tables:
type: [integer, "null"]
description: |
Maximum number of fact tables to allow in a join. Parameter for join reorder optimizer
rule.

preserve_user_order:
type: [boolean, "null"]
description: |
Whether to preserve user-defined order of unfiltered dimensions. Parameter for join
reorder optimizer rule.

filter_selectivity:
type: [number, "null"]
description: |
Constant to use when determining the number of rows produced by a filtered relation.
Parameter for join reorder optimizer rule.

sort:
type: object
properties:
Expand Down
11 changes: 11 additions & 0 deletions dask_sql/sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ sql:

dynamic_partition_pruning: True

optimizer:
verbose: False

fact_dimension_ratio: null

max_fact_tables: null

preserve_user_order: null

filter_selectivity: null

sort:
topk-nelem-limit: 1000000

Expand Down
2 changes: 1 addition & 1 deletion docs/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- sphinx>=4.0.0
- sphinx-tabs
- dask-sphinx-theme>=2.0.3
- dask>=2022.3.0,<=2024.1.1
- dask>=2022.3.0
- pandas>=1.4.0
- fugue>=0.7.3
# FIXME: https://github.com/fugue-project/fugue/issues/526
Expand Down
2 changes: 1 addition & 1 deletion docs/requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
sphinx>=4.0.0
sphinx-tabs
dask-sphinx-theme>=3.0.0
dask>=2022.3.0,<=2024.1.1
dask>=2022.3.0
pandas>=1.4.0
fugue>=0.7.3
# FIXME: https://github.com/fugue-project/fugue/issues/526
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ classifiers = [
readme = "README.md"
requires-python = ">=3.9"
dependencies = [
"dask[dataframe]>=2022.3.0,<=2024.1.1",
"distributed>=2022.3.0,<=2024.1.1",
"dask[dataframe]>=2022.3.0",
"distributed>=2022.3.0",
"pandas>=1.4.0",
"fastapi>=0.92.0",
"httpx>=0.24.1",
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fn _datafusion_lib(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<sql::function::DaskFunction>()?;
m.add_class::<sql::table::DaskStatistics>()?;
m.add_class::<sql::logical::PyLogicalPlan>()?;
m.add_class::<sql::DaskSQLOptimizerConfig>()?;

// Exceptions
m.add(
Expand Down
80 changes: 60 additions & 20 deletions src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,37 @@ pub struct DaskSQLContext {
current_schema: String,
schemas: HashMap<String, schema::DaskSchema>,
options: ConfigOptions,
optimizer_config: DaskSQLOptimizerConfig,
}

#[pyclass(name = "DaskSQLOptimizerConfig", module = "dask_sql", subclass)]
#[derive(Debug, Clone)]
pub struct DaskSQLOptimizerConfig {
dynamic_partition_pruning: bool,
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
}

#[pymethods]
impl DaskSQLOptimizerConfig {
#[new]
pub fn new(
dynamic_partition_pruning: bool,
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
) -> Self {
Self {
dynamic_partition_pruning,
fact_dimension_ratio,
max_fact_tables,
preserve_user_order,
filter_selectivity,
}
}
}

impl ContextProvider for DaskSQLContext {
Expand Down Expand Up @@ -478,18 +508,22 @@ impl ContextProvider for DaskSQLContext {
#[pymethods]
impl DaskSQLContext {
#[new]
pub fn new(default_catalog_name: &str, default_schema_name: &str) -> Self {
pub fn new(
default_catalog_name: &str,
default_schema_name: &str,
optimizer_config: DaskSQLOptimizerConfig,
) -> Self {
Self {
current_catalog: default_catalog_name.to_owned(),
current_schema: default_schema_name.to_owned(),
schemas: HashMap::new(),
options: ConfigOptions::new(),
dynamic_partition_pruning: false,
optimizer_config,
}
}

pub fn apply_dynamic_partition_pruning(&mut self, config: bool) -> PyResult<()> {
self.dynamic_partition_pruning = config;
pub fn set_optimizer_config(&mut self, config: DaskSQLOptimizerConfig) -> PyResult<()> {
self.optimizer_config = config;
Ok(())
}

Expand Down Expand Up @@ -591,23 +625,29 @@ impl DaskSQLContext {
Ok(existing_plan)
}
_ => {
let optimized_plan = optimizer::DaskSqlOptimizer::new()
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp);

let optimized_plan = optimizer::DaskSqlOptimizer::new(
self.optimizer_config.fact_dimension_ratio,
self.optimizer_config.max_fact_tables,
self.optimizer_config.preserve_user_order,
self.optimizer_config.filter_selectivity,
)
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp);
if let Ok(optimized_plan) = optimized_plan {
if self.dynamic_partition_pruning {
optimizer::DaskSqlOptimizer::dynamic_partition_pruner()
.optimize_once(optimized_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp)
if self.optimizer_config.dynamic_partition_pruning {
optimizer::DaskSqlOptimizer::dynamic_partition_pruner(
self.optimizer_config.fact_dimension_ratio,
)
.optimize_once(optimized_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp)
} else {
Ok(optimized_plan)
}
Expand Down
23 changes: 17 additions & 6 deletions src/sql/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ pub struct DaskSqlOptimizer {
impl DaskSqlOptimizer {
/// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired
/// optimizers as well as any custom `OptimizerRule` trait impls that might be desired.
pub fn new() -> Self {
pub fn new(
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
) -> Self {
debug!("Creating new instance of DaskSqlOptimizer");

let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Expand Down Expand Up @@ -75,7 +80,12 @@ impl DaskSqlOptimizer {
Arc::new(PushDownFilter::new()),
// Arc::new(SingleDistinctToGroupBy::new()),
// Dask-SQL specific optimizations
Arc::new(JoinReorder::default()),
Arc::new(JoinReorder::new(
fact_dimension_ratio,
max_fact_tables,
preserve_user_order,
filter_selectivity,
)),
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Arc::new(SimplifyExpressions::new()),
Expand All @@ -94,9 +104,10 @@ impl DaskSqlOptimizer {

// Create a separate instance of this optimization rule, since we want to ensure that it only
// runs one time
pub fn dynamic_partition_pruner() -> Self {
let rule: Vec<Arc<dyn OptimizerRule + Sync + Send>> =
vec![Arc::new(DynamicPartitionPruning::new())];
pub fn dynamic_partition_pruner(fact_dimension_ratio: Option<f64>) -> Self {
let rule: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![Arc::new(
DynamicPartitionPruning::new(fact_dimension_ratio.unwrap_or(0.3)),
)];

Self {
optimizer: Optimizer::with_rules(rule),
Expand Down Expand Up @@ -170,7 +181,7 @@ mod tests {
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// optimize the logical plan
let optimizer = DaskSqlOptimizer::new();
let optimizer = DaskSqlOptimizer::new(None, None, None, None);
optimizer.optimize(plan)
}

Expand Down
Loading