From 7c66b6ff729da6957b862d7bce8dd83f40c6c178 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Fri, 7 Jul 2023 15:31:27 -0700 Subject: [PATCH 01/20] create optimizer_config --- dask_planner/src/sql.rs | 37 ++++++++++++++++--- dask_planner/src/sql/optimizer.rs | 18 +++++++-- .../optimizer/dynamic_partition_pruning.rs | 26 +++++++++---- .../src/sql/optimizer/join_reorder.rs | 16 ++++++++ dask_sql/context.py | 20 ++++++---- dask_sql/sql-schema.yaml | 5 +++ dask_sql/sql.yaml | 2 + 7 files changed, 99 insertions(+), 25 deletions(-) diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index a0e238727..3af613e00 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -99,7 +99,28 @@ pub struct DaskSQLContext { current_schema: String, schemas: HashMap, options: ConfigOptions, + optimizer_config: DaskSQLOptimizerConfig, +} + +#[pyclass(name = "DaskSQLOptimizerConfig", module = "dask_planner", subclass)] +#[derive(Debug, Clone)] +pub struct DaskSQLOptimizerConfig { dynamic_partition_pruning: bool, + fact_dimension_ratio: f64, +} + +#[pymethods] +impl DaskSQLOptimizerConfig { + #[new] + pub fn new( + dynamic_partition_pruning: bool, + fact_dimension_ratio: f64, + ) -> Self { + Self { + dynamic_partition_pruning, + fact_dimension_ratio, + } + } } impl ContextProvider for DaskSQLContext { @@ -483,18 +504,22 @@ impl ContextProvider for DaskSQLContext { #[pymethods] impl DaskSQLContext { #[new] - pub fn new(default_catalog_name: &str, default_schema_name: &str) -> Self { + pub fn new( + default_catalog_name: &str, + default_schema_name: &str, + optimizer_config: DaskSQLOptimizerConfig, + ) -> Self { Self { current_catalog: default_catalog_name.to_owned(), current_schema: default_schema_name.to_owned(), schemas: HashMap::new(), options: ConfigOptions::new(), - dynamic_partition_pruning: false, + optimizer_config: optimizer_config, } } - pub fn apply_dynamic_partition_pruning(&mut self, config: bool) -> PyResult<()> { - self.dynamic_partition_pruning = config; + pub fn set_optimizer_config(&mut self, config: DaskSQLOptimizerConfig) -> PyResult<()> { + self.optimizer_config = config; Ok(()) } @@ -592,8 +617,8 @@ impl DaskSQLContext { current_node: None, }) .map_err(py_optimization_exp); - if self.dynamic_partition_pruning { - optimizer::DaskSqlOptimizer::dynamic_partition_pruner() + if self.optimizer_config.dynamic_partition_pruning { + optimizer::DaskSqlOptimizer::dynamic_partition_pruner(Some(self.optimizer_config.fact_dimension_ratio)) .optimize_once(optimized_plan.unwrap().original_plan) .map(|k| PyLogicalPlan { original_plan: k, diff --git a/dask_planner/src/sql/optimizer.rs b/dask_planner/src/sql/optimizer.rs index bdaa30ea7..32cd3cd50 100644 --- a/dask_planner/src/sql/optimizer.rs +++ b/dask_planner/src/sql/optimizer.rs @@ -42,7 +42,7 @@ impl DaskSqlOptimizer { pub fn new() -> Self { debug!("Creating new instance of DaskSqlOptimizer"); - let rules: Vec> = vec![ + let mut rules: Vec> = vec![ Arc::new(SimplifyExpressions::new()), Arc::new(UnwrapCastInComparison::new()), // Arc::new(ReplaceDistinctWithAggregate::new()), @@ -84,6 +84,12 @@ impl DaskSqlOptimizer { Arc::new(PushDownLimit::new()), ]; + let join_reorder_index = 13; + if let Some(rule) = rules.get_mut(join_reorder_index) { + // TODO: Replace Arc::new(JoinReorder::default()), with user specifications + *rule = Arc::new(JoinReorder::default()); + } + Self { optimizer: Optimizer::with_rules(rules), } @@ -91,9 +97,13 @@ impl DaskSqlOptimizer { // Create a separate instance of this optimization rule, since we want to ensure that it only // runs one time - pub fn dynamic_partition_pruner() -> Self { - let rule: Vec> = - vec![Arc::new(DynamicPartitionPruning::new())]; + pub fn dynamic_partition_pruner(fact_dimension_ratio: Option) -> Self { + let rule: Vec>; + if let Some(f) = fact_dimension_ratio { + rule = vec![Arc::new(DynamicPartitionPruning::new(f))]; + } else { + rule = vec![Arc::new(DynamicPartitionPruning::default())]; + } Self { optimizer: Optimizer::with_rules(rule), diff --git a/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs b/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs index 0ff48a682..52b1a0acb 100644 --- a/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs +++ b/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs @@ -36,11 +36,24 @@ use log::warn; use crate::sql::table::DaskTableSource; // Optimizer rule for dynamic partition pruning -pub struct DynamicPartitionPruning {} +pub struct DynamicPartitionPruning { + /// Ratio of the size of the dimension tables to fact tables + fact_dimension_ratio: f64, +} impl DynamicPartitionPruning { - pub fn new() -> Self { - Self {} + pub fn new(fact_dimension_ratio: f64) -> Self { + Self { + fact_dimension_ratio, + } + } +} + +impl Default for DynamicPartitionPruning { + fn default() -> Self { + Self { + fact_dimension_ratio: 0.3, + } } } @@ -106,9 +119,6 @@ impl OptimizerRule for DynamicPartitionPruning { (left_table.unwrap(), right_table.unwrap()); let (left_field, right_field) = (left_field.unwrap(), right_field.unwrap()); - // TODO: Consider allowing the fact_dimension_ratio to be configured by the - // user. See issue: https://github.com/dask-contrib/dask-sql/issues/1121 - let fact_dimension_ratio = 0.3; let (mut left_filtered_table, mut right_filtered_table) = (None, None); // Check if join uses an alias instead of the table name itself. Need to use @@ -136,7 +146,7 @@ impl OptimizerRule for DynamicPartitionPruning { .size .unwrap_or(largest_size as usize) as f64 / largest_size - < fact_dimension_ratio + < self.fact_dimension_ratio { left_filtered_table = read_table(left_table.clone(), left_field.clone(), tables.clone()); @@ -149,7 +159,7 @@ impl OptimizerRule for DynamicPartitionPruning { .size .unwrap_or(largest_size as usize) as f64 / largest_size - < fact_dimension_ratio + < self.fact_dimension_ratio { right_filtered_table = read_table(right_table.clone(), right_field.clone(), tables.clone()); diff --git a/dask_planner/src/sql/optimizer/join_reorder.rs b/dask_planner/src/sql/optimizer/join_reorder.rs index 8997f9d96..928f975c4 100644 --- a/dask_planner/src/sql/optimizer/join_reorder.rs +++ b/dask_planner/src/sql/optimizer/join_reorder.rs @@ -24,6 +24,22 @@ pub struct JoinReorder { filter_selectivity: f64, } +impl JoinReorder { + pub fn new( + max_fact_tables: usize, + fact_dimension_ratio: f64, + preserve_user_order: bool, + filter_selectivity: f64, + ) -> Self { + Self { + max_fact_tables, + fact_dimension_ratio, + preserve_user_order, + filter_selectivity, + } + } +} + impl Default for JoinReorder { fn default() -> Self { Self { diff --git a/dask_sql/context.py b/dask_sql/context.py index 17c6d0055..6c443523a 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -13,6 +13,7 @@ from dask_planner.rust import ( DaskSchema, DaskSQLContext, + DaskSQLOptimizerConfig, DaskTable, DFOptimizationException, DFParsingException, @@ -98,14 +99,16 @@ def __init__(self, logging_level=logging.INFO): # A started SQL server (useful for jupyter notebooks) self.sql_server = None + # Create the `DaskSQLOptimizerConfig` Rust context + optimizer_config = DaskSQLOptimizerConfig( + dask_config.get("sql.dynamic_partition_pruning"), + dask_config.get("sql.fact_dimension_ratio"), + ) + # Create the `DaskSQLContext` Rust context - self.context = DaskSQLContext(self.catalog_name, self.schema_name) + self.context = DaskSQLContext(self.catalog_name, self.schema_name, optimizer_config) self.context.register_schema(self.schema_name, DaskSchema(self.schema_name)) - self.context.apply_dynamic_partition_pruning( - dask_config.get("sql.dynamic_partition_pruning") - ) - # # Register any default plugins, if nothing was registered before. RelConverter.add_plugin_class(logical.DaskAggregatePlugin, replace=False) RelConverter.add_plugin_class(logical.DaskCrossJoinPlugin, replace=False) @@ -799,9 +802,12 @@ def _get_ral(self, sql): """Helper function to turn the sql query into a relational algebra and resulting column names""" logger.debug(f"Entering _get_ral('{sql}')") - self.context.apply_dynamic_partition_pruning( - dask_config.get("sql.dynamic_partition_pruning") + + optimizer_config = DaskSQLOptimizerConfig( + dask_config.get("sql.dynamic_partition_pruning"), + dask_config.get("sql.fact_dimension_ratio"), ) + self.context.update_optimizer_config(optimizer_config) # get the schema of what we currently have registered schemas = self._prepare_schemas() diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index eaab6936a..b4546a9c3 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -69,6 +69,11 @@ properties: description: | Whether to apply the dynamic partition pruning optimizer rule. + fact_dimension_ratio: + type: [number, "null"] + description: | + Ratio of the size of the dimension tables to fact tables. + sort: type: object properties: diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index 42434d20d..d7f7389e5 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -18,6 +18,8 @@ sql: dynamic_partition_pruning: True + fact_dimension_ratio: null + sort: topk-nelem-limit: 1000000 From 6f460fa8efbd492fb1169d11c1c086efd73ce647 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 10 Jul 2023 12:16:01 -0700 Subject: [PATCH 02/20] add join_reorder configs --- dask_planner/src/lib.rs | 1 + dask_planner/src/sql.rs | 22 ++++++++++--- dask_planner/src/sql/optimizer.rs | 24 ++++++++------ .../src/sql/optimizer/join_reorder.rs | 33 +++++++------------ dask_sql/context.py | 8 ++++- dask_sql/sql-schema.yaml | 21 +++++++++++- dask_sql/sql.yaml | 6 ++++ 7 files changed, 77 insertions(+), 38 deletions(-) diff --git a/dask_planner/src/lib.rs b/dask_planner/src/lib.rs index f5305d900..9263863c7 100644 --- a/dask_planner/src/lib.rs +++ b/dask_planner/src/lib.rs @@ -30,6 +30,7 @@ fn rust(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; // Exceptions m.add( diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index 3af613e00..8a26f3c01 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -106,7 +106,10 @@ pub struct DaskSQLContext { #[derive(Debug, Clone)] pub struct DaskSQLOptimizerConfig { dynamic_partition_pruning: bool, - fact_dimension_ratio: f64, + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, } #[pymethods] @@ -114,11 +117,17 @@ impl DaskSQLOptimizerConfig { #[new] pub fn new( dynamic_partition_pruning: bool, - fact_dimension_ratio: f64, + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, ) -> Self { Self { dynamic_partition_pruning, fact_dimension_ratio, + max_fact_tables, + preserve_user_order, + filter_selectivity, } } } @@ -610,7 +619,12 @@ impl DaskSQLContext { Ok(existing_plan) } _ => { - let optimized_plan = optimizer::DaskSqlOptimizer::new() + let optimized_plan = optimizer::DaskSqlOptimizer::new( + self.optimizer_config.fact_dimension_ratio, + self.optimizer_config.max_fact_tables, + self.optimizer_config.preserve_user_order, + self.optimizer_config.filter_selectivity, + ) .optimize(existing_plan.original_plan) .map(|k| PyLogicalPlan { original_plan: k, @@ -618,7 +632,7 @@ impl DaskSQLContext { }) .map_err(py_optimization_exp); if self.optimizer_config.dynamic_partition_pruning { - optimizer::DaskSqlOptimizer::dynamic_partition_pruner(Some(self.optimizer_config.fact_dimension_ratio)) + optimizer::DaskSqlOptimizer::dynamic_partition_pruner(self.optimizer_config.fact_dimension_ratio) .optimize_once(optimized_plan.unwrap().original_plan) .map(|k| PyLogicalPlan { original_plan: k, diff --git a/dask_planner/src/sql/optimizer.rs b/dask_planner/src/sql/optimizer.rs index 32cd3cd50..7dc8d7ddd 100644 --- a/dask_planner/src/sql/optimizer.rs +++ b/dask_planner/src/sql/optimizer.rs @@ -39,10 +39,15 @@ pub struct DaskSqlOptimizer { impl DaskSqlOptimizer { /// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired /// optimizers as well as any custom `OptimizerRule` trait impls that might be desired. - pub fn new() -> Self { + pub fn new( + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, + ) -> Self { debug!("Creating new instance of DaskSqlOptimizer"); - let mut rules: Vec> = vec![ + let rules: Vec> = vec![ Arc::new(SimplifyExpressions::new()), Arc::new(UnwrapCastInComparison::new()), // Arc::new(ReplaceDistinctWithAggregate::new()), @@ -72,7 +77,12 @@ impl DaskSqlOptimizer { Arc::new(PushDownFilter::new()), // Arc::new(SingleDistinctToGroupBy::new()), // Dask-SQL specific optimizations - Arc::new(JoinReorder::default()), + Arc::new(JoinReorder::new( + fact_dimension_ratio, + max_fact_tables, + preserve_user_order, + filter_selectivity, + )), // The previous optimizations added expressions and projections, // that might benefit from the following rules Arc::new(SimplifyExpressions::new()), @@ -84,12 +94,6 @@ impl DaskSqlOptimizer { Arc::new(PushDownLimit::new()), ]; - let join_reorder_index = 13; - if let Some(rule) = rules.get_mut(join_reorder_index) { - // TODO: Replace Arc::new(JoinReorder::default()), with user specifications - *rule = Arc::new(JoinReorder::default()); - } - Self { optimizer: Optimizer::with_rules(rules), } @@ -187,7 +191,7 @@ mod tests { let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // optimize the logical plan - let optimizer = DaskSqlOptimizer::new(); + let optimizer = DaskSqlOptimizer::new(None, None, None, None); optimizer.optimize(plan) } diff --git a/dask_planner/src/sql/optimizer/join_reorder.rs b/dask_planner/src/sql/optimizer/join_reorder.rs index 928f975c4..163ca2069 100644 --- a/dask_planner/src/sql/optimizer/join_reorder.rs +++ b/dask_planner/src/sql/optimizer/join_reorder.rs @@ -13,10 +13,10 @@ use log::warn; use crate::sql::table::DaskTableSource; pub struct JoinReorder { - /// Maximum number of fact tables to allow in a join - max_fact_tables: usize, /// Ratio of the size of the dimension tables to fact tables fact_dimension_ratio: f64, + /// Maximum number of fact tables to allow in a join + max_fact_tables: usize, /// Whether to preserve user-defined order of unfiltered dimensions preserve_user_order: bool, /// Constant to use when determining the number of rows produced by a @@ -26,28 +26,17 @@ pub struct JoinReorder { impl JoinReorder { pub fn new( - max_fact_tables: usize, - fact_dimension_ratio: f64, - preserve_user_order: bool, - filter_selectivity: f64, + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, ) -> Self { Self { - max_fact_tables, - fact_dimension_ratio, - preserve_user_order, - filter_selectivity, - } - } -} - -impl Default for JoinReorder { - fn default() -> Self { - Self { - max_fact_tables: 2, - // FIXME: fact_dimension_ratio should be 0.3 - fact_dimension_ratio: 0.7, - preserve_user_order: true, - filter_selectivity: 1.0, + // FIXME: Default value for fact_dimension_ratio should be 0.3, not 0.7 + fact_dimension_ratio: fact_dimension_ratio.unwrap_or(0.7), + max_fact_tables: max_fact_tables.unwrap_or(2), + preserve_user_order: preserve_user_order.unwrap_or(true), + filter_selectivity: filter_selectivity.unwrap_or(1.0), } } } diff --git a/dask_sql/context.py b/dask_sql/context.py index 6c443523a..c4617a594 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -103,6 +103,9 @@ def __init__(self, logging_level=logging.INFO): optimizer_config = DaskSQLOptimizerConfig( dask_config.get("sql.dynamic_partition_pruning"), dask_config.get("sql.fact_dimension_ratio"), + dask_config.get("sql.max_fact_tables"), + dask_config.get("sql.preserve_user_order"), + dask_config.get("sql.filter_selectivity"), ) # Create the `DaskSQLContext` Rust context @@ -806,8 +809,11 @@ def _get_ral(self, sql): optimizer_config = DaskSQLOptimizerConfig( dask_config.get("sql.dynamic_partition_pruning"), dask_config.get("sql.fact_dimension_ratio"), + dask_config.get("sql.max_fact_tables"), + dask_config.get("sql.preserve_user_order"), + dask_config.get("sql.filter_selectivity"), ) - self.context.update_optimizer_config(optimizer_config) + self.context.set_optimizer_config(optimizer_config) # get the schema of what we currently have registered schemas = self._prepare_schemas() diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index b4546a9c3..4cc66fae5 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -72,7 +72,26 @@ properties: fact_dimension_ratio: type: [number, "null"] description: | - Ratio of the size of the dimension tables to fact tables. + Ratio of the size of the dimension tables to fact tables. Parameter for Dynamic Partition + Pruning and Join Reorder optimization rules. + + max_fact_tables: + type: [integer, "null"] + description: | + Maximum number of fact tables to allow in a join. Parameter for Join Reorder optimization + rule. + + preserve_user_order: + type: [boolean, "null"] + description: | + Whether to preserve user-defined order of unfiltered dimensions. Parameter for Join + Reorder optimization rule. + + filter_selectivity: + type: [number, "null"] + description: | + Constant to use when determining the number of rows produced by a filtered relation. + Parameter for Join Reorder optimization rule. sort: type: object diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index d7f7389e5..ce07b6300 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -20,6 +20,12 @@ sql: fact_dimension_ratio: null + max_fact_tables: null + + preserve_user_order: null + + filter_selectivity: null + sort: topk-nelem-limit: 1000000 From ccb8d42aef123f7d269e2b2c8cc32cf81e597466 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 10 Jul 2023 12:44:57 -0700 Subject: [PATCH 03/20] add verbose_optimizer --- dask_planner/src/sql.rs | 24 +++++++++++++----------- dask_sql/context.py | 11 ++++++++++- dask_sql/sql-schema.yaml | 20 ++++++++++++++------ dask_sql/sql.yaml | 2 ++ 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index 8a26f3c01..4cb7c75c6 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -523,7 +523,7 @@ impl DaskSQLContext { current_schema: default_schema_name.to_owned(), schemas: HashMap::new(), options: ConfigOptions::new(), - optimizer_config: optimizer_config, + optimizer_config, } } @@ -625,20 +625,22 @@ impl DaskSQLContext { self.optimizer_config.preserve_user_order, self.optimizer_config.filter_selectivity, ) - .optimize(existing_plan.original_plan) + .optimize(existing_plan.original_plan) + .map(|k| PyLogicalPlan { + original_plan: k, + current_node: None, + }) + .map_err(py_optimization_exp); + if self.optimizer_config.dynamic_partition_pruning { + optimizer::DaskSqlOptimizer::dynamic_partition_pruner( + self.optimizer_config.fact_dimension_ratio, + ) + .optimize_once(optimized_plan.unwrap().original_plan) .map(|k| PyLogicalPlan { original_plan: k, current_node: None, }) - .map_err(py_optimization_exp); - if self.optimizer_config.dynamic_partition_pruning { - optimizer::DaskSqlOptimizer::dynamic_partition_pruner(self.optimizer_config.fact_dimension_ratio) - .optimize_once(optimized_plan.unwrap().original_plan) - .map(|k| PyLogicalPlan { - original_plan: k, - current_node: None, - }) - .map_err(py_optimization_exp) + .map_err(py_optimization_exp) } else { optimized_plan } diff --git a/dask_sql/context.py b/dask_sql/context.py index c4617a594..a021cc9e2 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -109,7 +109,9 @@ def __init__(self, logging_level=logging.INFO): ) # Create the `DaskSQLContext` Rust context - self.context = DaskSQLContext(self.catalog_name, self.schema_name, optimizer_config) + self.context = DaskSQLContext( + self.catalog_name, self.schema_name, optimizer_config + ) self.context.register_schema(self.schema_name, DaskSchema(self.schema_name)) # # Register any default plugins, if nothing was registered before. @@ -548,11 +550,18 @@ def explain( :obj:`str`: a description of the created relational algebra. """ + dynamic_partition_pruning = dask_config.get("sql.dynamic_partition_pruning") + if dask_config.get("sql.verbose_optimizer"): + dask_config.set({"sql.dynamic_partition_pruning": True}) + else: + dask_config.set({"sql.dynamic_partition_pruning": False}) + if dataframes is not None: for df_name, df in dataframes.items(): self.create_table(df_name, df, gpu=gpu) _, rel_string = self._get_ral(sql) + dask_config.set({"sql.dynamic_partition_pruning": dynamic_partition_pruning}) return rel_string def visualize(self, sql: str, filename="mydask.png") -> None: # pragma: no cover diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index 4cc66fae5..9830974ca 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -69,29 +69,37 @@ properties: description: | Whether to apply the dynamic partition pruning optimizer rule. + verbose_optimizer: + type: boolean + description: | + The dynamic partition pruning optimizer rule can sometimes result in extremely long + c.explain() outputs which are not helpful to the user. Setting this option to true allows + the user to see the entire output, while setting it to false truncates the output. + Default is false. + fact_dimension_ratio: type: [number, "null"] description: | - Ratio of the size of the dimension tables to fact tables. Parameter for Dynamic Partition - Pruning and Join Reorder optimization rules. + Ratio of the size of the dimension tables to fact tables. Parameter for dynamic partition + pruning and join reorder optimizer rules. max_fact_tables: type: [integer, "null"] description: | - Maximum number of fact tables to allow in a join. Parameter for Join Reorder optimization + Maximum number of fact tables to allow in a join. Parameter for join reorder optimizer rule. preserve_user_order: type: [boolean, "null"] description: | - Whether to preserve user-defined order of unfiltered dimensions. Parameter for Join - Reorder optimization rule. + Whether to preserve user-defined order of unfiltered dimensions. Parameter for join + reorder optimizer rule. filter_selectivity: type: [number, "null"] description: | Constant to use when determining the number of rows produced by a filtered relation. - Parameter for Join Reorder optimization rule. + Parameter for join reorder optimizer rule. sort: type: object diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index ce07b6300..596fccb7b 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -18,6 +18,8 @@ sql: dynamic_partition_pruning: True + verbose_optimizer: False + fact_dimension_ratio: null max_fact_tables: null From 16b7a8f65a1fa298f861bf372949db3729692157 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 10 Jul 2023 14:22:25 -0700 Subject: [PATCH 04/20] add test_dynamic_partition_pruning --- dask_sql/context.py | 4 +-- tests/unit/test_config.py | 53 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/dask_sql/context.py b/dask_sql/context.py index a021cc9e2..a4fc27c0e 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -551,9 +551,7 @@ def explain( """ dynamic_partition_pruning = dask_config.get("sql.dynamic_partition_pruning") - if dask_config.get("sql.verbose_optimizer"): - dask_config.set({"sql.dynamic_partition_pruning": True}) - else: + if not dask_config.get("sql.verbose_optimizer"): dask_config.set({"sql.dynamic_partition_pruning": False}) if dataframes is not None: diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 56406244d..5200ed07d 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,10 +1,14 @@ import os from unittest import mock +import dask.dataframe as dd +import pandas as pd import pytest import yaml from dask import config as dask_config +from dask_sql import Context + # Required to instantiate default sql config import dask_sql # noqa: F401 @@ -96,3 +100,52 @@ def test_dask_setconfig(): assert dask_config.get("sql.foo") == {"bar": 1, "baz": "2"} assert dask_config.get("sql.foo") == {"bar": 1} dask_config.refresh() + + +def test_dynamic_partition_pruning(tmpdir): + c = Context() + + df1 = pd.DataFrame( + { + "x": [1, 2, 3], + "z": [7, 8, 9], + }, + ) + dd.from_pandas(df1, npartitions=3).to_parquet(os.path.join(tmpdir, "df1")) + df1 = dd.read_parquet(os.path.join(tmpdir, "df1")) + c.create_table("df1", df1) + + df2 = pd.DataFrame( + { + "x": [1, 2, 3] * 1000, + "y": [4, 5, 6] * 1000, + }, + ) + dd.from_pandas(df2, npartitions=3).to_parquet(os.path.join(tmpdir, "df2")) + df2 = dd.read_parquet(os.path.join(tmpdir, "df2")) + c.create_table("df2", df2) + + query = "SELECT * FROM df1, df2 WHERE df1.x = df2.x AND df1.z=7" + inlist_expr = "df2.x IN ([Int64(1)])" + + # Default value is False + dask_config.set({"sql.verbose_optimizer": True}) + + # When DPP is turned off, the explain output will not contain the INLIST expression + dask_config.set({"sql.dynamic_partition_pruning": False}) + explain_string = c.explain(query) + assert inlist_expr not in explain_string + + # When DPP is turned on but verbose_optimizer is off, the explain output will not contain the + # INLIST expression + dask_config.set({"sql.dynamic_partition_pruning": True}) + dask_config.set({"sql.verbose_optimizer": False}) + explain_string = c.explain(query) + assert inlist_expr not in explain_string + + # When both DPP and verbose_optimizer are turned on, the explain output will contain the INLIST + # expression + dask_config.set({"sql.dynamic_partition_pruning": True}) + dask_config.set({"sql.verbose_optimizer": True}) + explain_string = c.explain(query) + assert inlist_expr in explain_string From 28a5d7f124f3241a1b0f961c65e86670c36abe0b Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Mon, 10 Jul 2023 14:55:32 -0700 Subject: [PATCH 05/20] skip 3.8 tests --- tests/unit/test_config.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 5200ed07d..e55b3404f 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,4 +1,5 @@ import os +import sys from unittest import mock import dask.dataframe as dd @@ -7,10 +8,9 @@ import yaml from dask import config as dask_config -from dask_sql import Context - # Required to instantiate default sql config import dask_sql # noqa: F401 +from dask_sql import Context def test_custom_yaml(tmpdir): @@ -102,6 +102,10 @@ def test_dask_setconfig(): dask_config.refresh() +@pytest.mark.skipif( + sys.version_info < (3, 9), + reason="Writing and reading the Dask DataFrame causes a ProtocolError", +) def test_dynamic_partition_pruning(tmpdir): c = Context() From f3e9f523889499cdb1581449546e815a70ba5bca Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 11 Jul 2023 15:12:52 -0700 Subject: [PATCH 06/20] remove dpp default --- dask_planner/src/sql/optimizer.rs | 8 ++------ .../src/sql/optimizer/dynamic_partition_pruning.rs | 8 -------- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/dask_planner/src/sql/optimizer.rs b/dask_planner/src/sql/optimizer.rs index 7dc8d7ddd..d1f6c83b3 100644 --- a/dask_planner/src/sql/optimizer.rs +++ b/dask_planner/src/sql/optimizer.rs @@ -102,12 +102,8 @@ impl DaskSqlOptimizer { // Create a separate instance of this optimization rule, since we want to ensure that it only // runs one time pub fn dynamic_partition_pruner(fact_dimension_ratio: Option) -> Self { - let rule: Vec>; - if let Some(f) = fact_dimension_ratio { - rule = vec![Arc::new(DynamicPartitionPruning::new(f))]; - } else { - rule = vec![Arc::new(DynamicPartitionPruning::default())]; - } + let rule: Vec> = + vec![Arc::new(DynamicPartitionPruning::new(fact_dimension_ratio.unwrap_or(0.3)))]; Self { optimizer: Optimizer::with_rules(rule), diff --git a/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs b/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs index 52b1a0acb..8abcdc172 100644 --- a/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs +++ b/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs @@ -49,14 +49,6 @@ impl DynamicPartitionPruning { } } -impl Default for DynamicPartitionPruning { - fn default() -> Self { - Self { - fact_dimension_ratio: 0.3, - } - } -} - impl OptimizerRule for DynamicPartitionPruning { fn name(&self) -> &str { "dynamic_partition_pruning" From fa4976b9984a1efcf5c8c99a37ac407e46db9ee7 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Tue, 11 Jul 2023 15:23:04 -0700 Subject: [PATCH 07/20] style --- dask_planner/src/sql/optimizer.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dask_planner/src/sql/optimizer.rs b/dask_planner/src/sql/optimizer.rs index d1f6c83b3..fe10c3dc7 100644 --- a/dask_planner/src/sql/optimizer.rs +++ b/dask_planner/src/sql/optimizer.rs @@ -102,8 +102,9 @@ impl DaskSqlOptimizer { // Create a separate instance of this optimization rule, since we want to ensure that it only // runs one time pub fn dynamic_partition_pruner(fact_dimension_ratio: Option) -> Self { - let rule: Vec> = - vec![Arc::new(DynamicPartitionPruning::new(fact_dimension_ratio.unwrap_or(0.3)))]; + let rule: Vec> = vec![Arc::new( + DynamicPartitionPruning::new(fact_dimension_ratio.unwrap_or(0.3)), + )]; Self { optimizer: Optimizer::with_rules(rule), From c9c5a10f22ef9c309114d254cc2533e430bfb5b4 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 17 Aug 2023 13:31:23 -0700 Subject: [PATCH 08/20] temporarily remove rust changes --- dask_planner/src/lib.rs | 1 - dask_planner/src/sql.rs | 61 ++----------------- dask_planner/src/sql/optimizer.rs | 23 ++----- .../optimizer/dynamic_partition_pruning.rs | 18 +++--- .../src/sql/optimizer/join_reorder.rs | 23 +++---- 5 files changed, 29 insertions(+), 97 deletions(-) diff --git a/dask_planner/src/lib.rs b/dask_planner/src/lib.rs index 9263863c7..f5305d900 100644 --- a/dask_planner/src/lib.rs +++ b/dask_planner/src/lib.rs @@ -30,7 +30,6 @@ fn rust(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; // Exceptions m.add( diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index 4cb7c75c6..e05d547b9 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -99,37 +99,7 @@ pub struct DaskSQLContext { current_schema: String, schemas: HashMap, options: ConfigOptions, - optimizer_config: DaskSQLOptimizerConfig, -} - -#[pyclass(name = "DaskSQLOptimizerConfig", module = "dask_planner", subclass)] -#[derive(Debug, Clone)] -pub struct DaskSQLOptimizerConfig { dynamic_partition_pruning: bool, - fact_dimension_ratio: Option, - max_fact_tables: Option, - preserve_user_order: Option, - filter_selectivity: Option, -} - -#[pymethods] -impl DaskSQLOptimizerConfig { - #[new] - pub fn new( - dynamic_partition_pruning: bool, - fact_dimension_ratio: Option, - max_fact_tables: Option, - preserve_user_order: Option, - filter_selectivity: Option, - ) -> Self { - Self { - dynamic_partition_pruning, - fact_dimension_ratio, - max_fact_tables, - preserve_user_order, - filter_selectivity, - } - } } impl ContextProvider for DaskSQLContext { @@ -513,22 +483,18 @@ impl ContextProvider for DaskSQLContext { #[pymethods] impl DaskSQLContext { #[new] - pub fn new( - default_catalog_name: &str, - default_schema_name: &str, - optimizer_config: DaskSQLOptimizerConfig, - ) -> Self { + pub fn new(default_catalog_name: &str, default_schema_name: &str) -> Self { Self { current_catalog: default_catalog_name.to_owned(), current_schema: default_schema_name.to_owned(), schemas: HashMap::new(), options: ConfigOptions::new(), - optimizer_config, + dynamic_partition_pruning: false, } } - pub fn set_optimizer_config(&mut self, config: DaskSQLOptimizerConfig) -> PyResult<()> { - self.optimizer_config = config; + pub fn apply_dynamic_partition_pruning(&mut self, config: bool) -> PyResult<()> { + self.dynamic_partition_pruning = config; Ok(()) } @@ -619,23 +585,8 @@ impl DaskSQLContext { Ok(existing_plan) } _ => { - let optimized_plan = optimizer::DaskSqlOptimizer::new( - self.optimizer_config.fact_dimension_ratio, - self.optimizer_config.max_fact_tables, - self.optimizer_config.preserve_user_order, - self.optimizer_config.filter_selectivity, - ) - .optimize(existing_plan.original_plan) - .map(|k| PyLogicalPlan { - original_plan: k, - current_node: None, - }) - .map_err(py_optimization_exp); - if self.optimizer_config.dynamic_partition_pruning { - optimizer::DaskSqlOptimizer::dynamic_partition_pruner( - self.optimizer_config.fact_dimension_ratio, - ) - .optimize_once(optimized_plan.unwrap().original_plan) + let optimized_plan = optimizer::DaskSqlOptimizer::new() + .optimize(existing_plan.original_plan) .map(|k| PyLogicalPlan { original_plan: k, current_node: None, diff --git a/dask_planner/src/sql/optimizer.rs b/dask_planner/src/sql/optimizer.rs index fe10c3dc7..bdaa30ea7 100644 --- a/dask_planner/src/sql/optimizer.rs +++ b/dask_planner/src/sql/optimizer.rs @@ -39,12 +39,7 @@ pub struct DaskSqlOptimizer { impl DaskSqlOptimizer { /// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired /// optimizers as well as any custom `OptimizerRule` trait impls that might be desired. - pub fn new( - fact_dimension_ratio: Option, - max_fact_tables: Option, - preserve_user_order: Option, - filter_selectivity: Option, - ) -> Self { + pub fn new() -> Self { debug!("Creating new instance of DaskSqlOptimizer"); let rules: Vec> = vec![ @@ -77,12 +72,7 @@ impl DaskSqlOptimizer { Arc::new(PushDownFilter::new()), // Arc::new(SingleDistinctToGroupBy::new()), // Dask-SQL specific optimizations - Arc::new(JoinReorder::new( - fact_dimension_ratio, - max_fact_tables, - preserve_user_order, - filter_selectivity, - )), + Arc::new(JoinReorder::default()), // The previous optimizations added expressions and projections, // that might benefit from the following rules Arc::new(SimplifyExpressions::new()), @@ -101,10 +91,9 @@ impl DaskSqlOptimizer { // Create a separate instance of this optimization rule, since we want to ensure that it only // runs one time - pub fn dynamic_partition_pruner(fact_dimension_ratio: Option) -> Self { - let rule: Vec> = vec![Arc::new( - DynamicPartitionPruning::new(fact_dimension_ratio.unwrap_or(0.3)), - )]; + pub fn dynamic_partition_pruner() -> Self { + let rule: Vec> = + vec![Arc::new(DynamicPartitionPruning::new())]; Self { optimizer: Optimizer::with_rules(rule), @@ -188,7 +177,7 @@ mod tests { let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // optimize the logical plan - let optimizer = DaskSqlOptimizer::new(None, None, None, None); + let optimizer = DaskSqlOptimizer::new(); optimizer.optimize(plan) } diff --git a/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs b/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs index 8abcdc172..0ff48a682 100644 --- a/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs +++ b/dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs @@ -36,16 +36,11 @@ use log::warn; use crate::sql::table::DaskTableSource; // Optimizer rule for dynamic partition pruning -pub struct DynamicPartitionPruning { - /// Ratio of the size of the dimension tables to fact tables - fact_dimension_ratio: f64, -} +pub struct DynamicPartitionPruning {} impl DynamicPartitionPruning { - pub fn new(fact_dimension_ratio: f64) -> Self { - Self { - fact_dimension_ratio, - } + pub fn new() -> Self { + Self {} } } @@ -111,6 +106,9 @@ impl OptimizerRule for DynamicPartitionPruning { (left_table.unwrap(), right_table.unwrap()); let (left_field, right_field) = (left_field.unwrap(), right_field.unwrap()); + // TODO: Consider allowing the fact_dimension_ratio to be configured by the + // user. See issue: https://github.com/dask-contrib/dask-sql/issues/1121 + let fact_dimension_ratio = 0.3; let (mut left_filtered_table, mut right_filtered_table) = (None, None); // Check if join uses an alias instead of the table name itself. Need to use @@ -138,7 +136,7 @@ impl OptimizerRule for DynamicPartitionPruning { .size .unwrap_or(largest_size as usize) as f64 / largest_size - < self.fact_dimension_ratio + < fact_dimension_ratio { left_filtered_table = read_table(left_table.clone(), left_field.clone(), tables.clone()); @@ -151,7 +149,7 @@ impl OptimizerRule for DynamicPartitionPruning { .size .unwrap_or(largest_size as usize) as f64 / largest_size - < self.fact_dimension_ratio + < fact_dimension_ratio { right_filtered_table = read_table(right_table.clone(), right_field.clone(), tables.clone()); diff --git a/dask_planner/src/sql/optimizer/join_reorder.rs b/dask_planner/src/sql/optimizer/join_reorder.rs index 163ca2069..8997f9d96 100644 --- a/dask_planner/src/sql/optimizer/join_reorder.rs +++ b/dask_planner/src/sql/optimizer/join_reorder.rs @@ -13,10 +13,10 @@ use log::warn; use crate::sql::table::DaskTableSource; pub struct JoinReorder { - /// Ratio of the size of the dimension tables to fact tables - fact_dimension_ratio: f64, /// Maximum number of fact tables to allow in a join max_fact_tables: usize, + /// Ratio of the size of the dimension tables to fact tables + fact_dimension_ratio: f64, /// Whether to preserve user-defined order of unfiltered dimensions preserve_user_order: bool, /// Constant to use when determining the number of rows produced by a @@ -24,19 +24,14 @@ pub struct JoinReorder { filter_selectivity: f64, } -impl JoinReorder { - pub fn new( - fact_dimension_ratio: Option, - max_fact_tables: Option, - preserve_user_order: Option, - filter_selectivity: Option, - ) -> Self { +impl Default for JoinReorder { + fn default() -> Self { Self { - // FIXME: Default value for fact_dimension_ratio should be 0.3, not 0.7 - fact_dimension_ratio: fact_dimension_ratio.unwrap_or(0.7), - max_fact_tables: max_fact_tables.unwrap_or(2), - preserve_user_order: preserve_user_order.unwrap_or(true), - filter_selectivity: filter_selectivity.unwrap_or(1.0), + max_fact_tables: 2, + // FIXME: fact_dimension_ratio should be 0.3 + fact_dimension_ratio: 0.7, + preserve_user_order: true, + filter_selectivity: 1.0, } } } From e527d837760edb23a26e36cd13a581ad3121c5b0 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 17 Aug 2023 13:32:36 -0700 Subject: [PATCH 09/20] temp remove sql.rs changes --- dask_planner/src/sql.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/dask_planner/src/sql.rs b/dask_planner/src/sql.rs index e05d547b9..a0e238727 100644 --- a/dask_planner/src/sql.rs +++ b/dask_planner/src/sql.rs @@ -591,7 +591,15 @@ impl DaskSQLContext { original_plan: k, current_node: None, }) - .map_err(py_optimization_exp) + .map_err(py_optimization_exp); + if self.dynamic_partition_pruning { + optimizer::DaskSqlOptimizer::dynamic_partition_pruner() + .optimize_once(optimized_plan.unwrap().original_plan) + .map(|k| PyLogicalPlan { + original_plan: k, + current_node: None, + }) + .map_err(py_optimization_exp) } else { optimized_plan } From 35c363b0b5cb28df9a326d48a7991b500a08c373 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 17 Aug 2023 13:48:17 -0700 Subject: [PATCH 10/20] readd rust changes --- src/lib.rs | 1 + src/sql.rs | 76 ++++++++++++++----- src/sql/optimizer.rs | 23 ++++-- .../optimizer/dynamic_partition_pruning.rs | 18 +++-- src/sql/optimizer/join_reorder.rs | 23 +++--- 5 files changed, 98 insertions(+), 43 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 921478973..51b3ecfe4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,7 @@ fn _datafusion_lib(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; // Exceptions m.add( diff --git a/src/sql.rs b/src/sql.rs index c9a600225..a67a37989 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -85,7 +85,37 @@ pub struct DaskSQLContext { current_schema: String, schemas: HashMap, options: ConfigOptions, + optimizer_config: DaskSQLOptimizerConfig, +} + +#[pyclass(name = "DaskSQLOptimizerConfig", module = "dask_sql", subclass)] +#[derive(Debug, Clone)] +pub struct DaskSQLOptimizerConfig { dynamic_partition_pruning: bool, + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, +} + +#[pymethods] +impl DaskSQLOptimizerConfig { + #[new] + pub fn new( + dynamic_partition_pruning: bool, + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, + ) -> Self { + Self { + dynamic_partition_pruning, + fact_dimension_ratio, + max_fact_tables, + preserve_user_order, + filter_selectivity, + } + } } impl ContextProvider for DaskSQLContext { @@ -476,18 +506,22 @@ impl ContextProvider for DaskSQLContext { #[pymethods] impl DaskSQLContext { #[new] - pub fn new(default_catalog_name: &str, default_schema_name: &str) -> Self { + pub fn new( + default_catalog_name: &str, + default_schema_name: &str, + optimizer_config: DaskSQLOptimizerConfig, + ) -> Self { Self { current_catalog: default_catalog_name.to_owned(), current_schema: default_schema_name.to_owned(), schemas: HashMap::new(), options: ConfigOptions::new(), - dynamic_partition_pruning: false, + optimizer_config, } } - pub fn apply_dynamic_partition_pruning(&mut self, config: bool) -> PyResult<()> { - self.dynamic_partition_pruning = config; + pub fn set_optimizer_config(&mut self, config: DaskSQLOptimizerConfig) -> PyResult<()> { + self.optimizer_config = config; Ok(()) } @@ -578,26 +612,28 @@ impl DaskSQLContext { Ok(existing_plan) } _ => { - let optimized_plan = optimizer::DaskSqlOptimizer::new() - .optimize(existing_plan.original_plan) + let optimized_plan = optimizer::DaskSqlOptimizer::new( + self.optimizer_config.fact_dimension_ratio, + self.optimizer_config.max_fact_tables, + self.optimizer_config.preserve_user_order, + self.optimizer_config.filter_selectivity, + ) + .optimize(existing_plan.original_plan) + .map(|k| PyLogicalPlan { + original_plan: k, + current_node: None, + }) + .map_err(py_optimization_exp); + if self.optimizer_config.dynamic_partition_pruning { + optimizer::DaskSqlOptimizer::dynamic_partition_pruner( + self.optimizer_config.fact_dimension_ratio, + ) + .optimize_once(optimized_plan.unwrap().original_plan) .map(|k| PyLogicalPlan { original_plan: k, current_node: None, }) - .map_err(py_optimization_exp); - - if let Ok(optimized_plan) = optimized_plan { - if self.dynamic_partition_pruning { - optimizer::DaskSqlOptimizer::dynamic_partition_pruner() - .optimize_once(optimized_plan.original_plan) - .map(|k| PyLogicalPlan { - original_plan: k, - current_node: None, - }) - .map_err(py_optimization_exp) - } else { - Ok(optimized_plan) - } + .map_err(py_optimization_exp) } else { optimized_plan } diff --git a/src/sql/optimizer.rs b/src/sql/optimizer.rs index 85f335572..2c8be3a56 100644 --- a/src/sql/optimizer.rs +++ b/src/sql/optimizer.rs @@ -42,7 +42,12 @@ pub struct DaskSqlOptimizer { impl DaskSqlOptimizer { /// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired /// optimizers as well as any custom `OptimizerRule` trait impls that might be desired. - pub fn new() -> Self { + pub fn new( + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, + ) -> Self { debug!("Creating new instance of DaskSqlOptimizer"); let rules: Vec> = vec![ @@ -75,7 +80,12 @@ impl DaskSqlOptimizer { Arc::new(PushDownFilter::new()), // Arc::new(SingleDistinctToGroupBy::new()), // Dask-SQL specific optimizations - Arc::new(JoinReorder::default()), + Arc::new(JoinReorder::new( + fact_dimension_ratio, + max_fact_tables, + preserve_user_order, + filter_selectivity, + )), // The previous optimizations added expressions and projections, // that might benefit from the following rules Arc::new(SimplifyExpressions::new()), @@ -94,9 +104,10 @@ impl DaskSqlOptimizer { // Create a separate instance of this optimization rule, since we want to ensure that it only // runs one time - pub fn dynamic_partition_pruner() -> Self { - let rule: Vec> = - vec![Arc::new(DynamicPartitionPruning::new())]; + pub fn dynamic_partition_pruner(fact_dimension_ratio: Option) -> Self { + let rule: Vec> = vec![Arc::new( + DynamicPartitionPruning::new(fact_dimension_ratio.unwrap_or(0.3)), + )]; Self { optimizer: Optimizer::with_rules(rule), @@ -170,7 +181,7 @@ mod tests { let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap(); // optimize the logical plan - let optimizer = DaskSqlOptimizer::new(); + let optimizer = DaskSqlOptimizer::new(None, None, None, None); optimizer.optimize(plan) } diff --git a/src/sql/optimizer/dynamic_partition_pruning.rs b/src/sql/optimizer/dynamic_partition_pruning.rs index d7e1a8be5..14cfc30d5 100644 --- a/src/sql/optimizer/dynamic_partition_pruning.rs +++ b/src/sql/optimizer/dynamic_partition_pruning.rs @@ -37,11 +37,16 @@ use log::warn; use crate::sql::table::DaskTableSource; // Optimizer rule for dynamic partition pruning -pub struct DynamicPartitionPruning {} +pub struct DynamicPartitionPruning { + /// Ratio of the size of the dimension tables to fact tables + fact_dimension_ratio: f64, +} impl DynamicPartitionPruning { - pub fn new() -> Self { - Self {} + pub fn new(fact_dimension_ratio: f64) -> Self { + Self { + fact_dimension_ratio, + } } } @@ -107,9 +112,6 @@ impl OptimizerRule for DynamicPartitionPruning { (left_table.unwrap(), right_table.unwrap()); let (left_field, right_field) = (left_field.unwrap(), right_field.unwrap()); - // TODO: Consider allowing the fact_dimension_ratio to be configured by the - // user. See issue: https://github.com/dask-contrib/dask-sql/issues/1121 - let fact_dimension_ratio = 0.3; let (mut left_filtered_table, mut right_filtered_table) = (None, None); // Check if join uses an alias instead of the table name itself. Need to use @@ -137,7 +139,7 @@ impl OptimizerRule for DynamicPartitionPruning { .size .unwrap_or(largest_size as usize) as f64 / largest_size - < fact_dimension_ratio + < self.fact_dimension_ratio { left_filtered_table = read_table(left_table.clone(), left_field.clone(), tables.clone()); @@ -150,7 +152,7 @@ impl OptimizerRule for DynamicPartitionPruning { .size .unwrap_or(largest_size as usize) as f64 / largest_size - < fact_dimension_ratio + < self.fact_dimension_ratio { right_filtered_table = read_table(right_table.clone(), right_field.clone(), tables.clone()); diff --git a/src/sql/optimizer/join_reorder.rs b/src/sql/optimizer/join_reorder.rs index 8997f9d96..163ca2069 100644 --- a/src/sql/optimizer/join_reorder.rs +++ b/src/sql/optimizer/join_reorder.rs @@ -13,10 +13,10 @@ use log::warn; use crate::sql::table::DaskTableSource; pub struct JoinReorder { - /// Maximum number of fact tables to allow in a join - max_fact_tables: usize, /// Ratio of the size of the dimension tables to fact tables fact_dimension_ratio: f64, + /// Maximum number of fact tables to allow in a join + max_fact_tables: usize, /// Whether to preserve user-defined order of unfiltered dimensions preserve_user_order: bool, /// Constant to use when determining the number of rows produced by a @@ -24,14 +24,19 @@ pub struct JoinReorder { filter_selectivity: f64, } -impl Default for JoinReorder { - fn default() -> Self { +impl JoinReorder { + pub fn new( + fact_dimension_ratio: Option, + max_fact_tables: Option, + preserve_user_order: Option, + filter_selectivity: Option, + ) -> Self { Self { - max_fact_tables: 2, - // FIXME: fact_dimension_ratio should be 0.3 - fact_dimension_ratio: 0.7, - preserve_user_order: true, - filter_selectivity: 1.0, + // FIXME: Default value for fact_dimension_ratio should be 0.3, not 0.7 + fact_dimension_ratio: fact_dimension_ratio.unwrap_or(0.7), + max_fact_tables: max_fact_tables.unwrap_or(2), + preserve_user_order: preserve_user_order.unwrap_or(true), + filter_selectivity: filter_selectivity.unwrap_or(1.0), } } } From c1a19e2a1fd892e92aed61d6c60370701dfb931c Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 17 Aug 2023 15:24:14 -0700 Subject: [PATCH 11/20] check optimized_plan is Ok --- src/sql.rs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/sql.rs b/src/sql.rs index a67a37989..8d48a71e4 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -624,16 +624,20 @@ impl DaskSQLContext { current_node: None, }) .map_err(py_optimization_exp); - if self.optimizer_config.dynamic_partition_pruning { - optimizer::DaskSqlOptimizer::dynamic_partition_pruner( - self.optimizer_config.fact_dimension_ratio, - ) - .optimize_once(optimized_plan.unwrap().original_plan) - .map(|k| PyLogicalPlan { - original_plan: k, - current_node: None, - }) - .map_err(py_optimization_exp) + if let Ok(optimized_plan) = optimized_plan { + if self.optimizer_config.dynamic_partition_pruning { + optimizer::DaskSqlOptimizer::dynamic_partition_pruner( + self.optimizer_config.fact_dimension_ratio, + ) + .optimize_once(optimized_plan.original_plan) + .map(|k| PyLogicalPlan { + original_plan: k, + current_node: None, + }) + .map_err(py_optimization_exp) + } else { + Ok(optimized_plan) + } } else { optimized_plan } From 67d425debf15c6220924130941839339e60830b6 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 15 Nov 2023 11:07:07 -0800 Subject: [PATCH 12/20] sql.dynamic_partition_pruning.verbose --- dask_sql/context.py | 2 +- dask_sql/sql-schema.yaml | 15 ++++++++------- dask_sql/sql.yaml | 3 +-- tests/unit/test_config.py | 10 +++++----- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/dask_sql/context.py b/dask_sql/context.py index d7071c5e9..0e04f7a51 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -551,7 +551,7 @@ def explain( """ dynamic_partition_pruning = dask_config.get("sql.dynamic_partition_pruning") - if not dask_config.get("sql.verbose_optimizer"): + if not dask_config.get("sql.dynamic_partition_pruning.verbose"): dask_config.set({"sql.dynamic_partition_pruning": False}) if dataframes is not None: diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index 9830974ca..ec600301c 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -68,14 +68,15 @@ properties: type: boolean description: | Whether to apply the dynamic partition pruning optimizer rule. + properies: - verbose_optimizer: - type: boolean - description: | - The dynamic partition pruning optimizer rule can sometimes result in extremely long - c.explain() outputs which are not helpful to the user. Setting this option to true allows - the user to see the entire output, while setting it to false truncates the output. - Default is false. + verbose: + type: boolean + description: | + The dynamic partition pruning optimizer rule can sometimes result in extremely long + c.explain() outputs which are not helpful to the user. Setting this option to true + allows the user to see the entire output, while setting it to false truncates the + output. Default is false. fact_dimension_ratio: type: [number, "null"] diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index 596fccb7b..a0666d616 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -17,8 +17,7 @@ sql: predicate_pushdown: True dynamic_partition_pruning: True - - verbose_optimizer: False + verbose: False fact_dimension_ratio: null diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index e55b3404f..033d8b317 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -133,23 +133,23 @@ def test_dynamic_partition_pruning(tmpdir): inlist_expr = "df2.x IN ([Int64(1)])" # Default value is False - dask_config.set({"sql.verbose_optimizer": True}) + dask_config.set({"sql.dynamic_partition_pruning.verbose": True}) # When DPP is turned off, the explain output will not contain the INLIST expression dask_config.set({"sql.dynamic_partition_pruning": False}) explain_string = c.explain(query) assert inlist_expr not in explain_string - # When DPP is turned on but verbose_optimizer is off, the explain output will not contain the + # When DPP is turned on but sql.dynamic_partition_pruning.verbose is off, the explain output will not contain the # INLIST expression dask_config.set({"sql.dynamic_partition_pruning": True}) - dask_config.set({"sql.verbose_optimizer": False}) + dask_config.set({"sql.dynamic_partition_pruning.verbose": False}) explain_string = c.explain(query) assert inlist_expr not in explain_string - # When both DPP and verbose_optimizer are turned on, the explain output will contain the INLIST + # When both DPP and sql.dynamic_partition_pruning.verbose are turned on, the explain output will contain the INLIST # expression dask_config.set({"sql.dynamic_partition_pruning": True}) - dask_config.set({"sql.verbose_optimizer": True}) + dask_config.set({"sql.dynamic_partition_pruning.verbose": True}) explain_string = c.explain(query) assert inlist_expr in explain_string From 4e8bb93ed01e223685f9583b50e02f54908be9cb Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 15 Nov 2023 11:13:42 -0800 Subject: [PATCH 13/20] edit yaml style --- dask_sql/sql-schema.yaml | 15 +++++++-------- dask_sql/sql.yaml | 3 ++- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index ec600301c..79b4e6860 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -68,15 +68,14 @@ properties: type: boolean description: | Whether to apply the dynamic partition pruning optimizer rule. - properies: - verbose: - type: boolean - description: | - The dynamic partition pruning optimizer rule can sometimes result in extremely long - c.explain() outputs which are not helpful to the user. Setting this option to true - allows the user to see the entire output, while setting it to false truncates the - output. Default is false. + dynamic_partition_pruning.verbose: + type: boolean + description: | + The dynamic partition pruning optimizer rule can sometimes result in extremely long + c.explain() outputs which are not helpful to the user. Setting this option to true allows + the user to see the entire output, while setting it to false truncates the output. + Default is false. fact_dimension_ratio: type: [number, "null"] diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index a0666d616..aaa38e3c6 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -17,7 +17,8 @@ sql: predicate_pushdown: True dynamic_partition_pruning: True - verbose: False + + dynamic_partition_pruning.verbose: False fact_dimension_ratio: null From 33b36c6d63952eda6041b7c3b2d59ea8d28da955 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 15 Nov 2023 11:39:26 -0800 Subject: [PATCH 14/20] edit --- dask_sql/sql-schema.yaml | 16 ++++++++++------ dask_sql/sql.yaml | 3 ++- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index 79b4e6860..2070283b7 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -69,13 +69,17 @@ properties: description: | Whether to apply the dynamic partition pruning optimizer rule. - dynamic_partition_pruning.verbose: + dynamic_partition_pruning: type: boolean - description: | - The dynamic partition pruning optimizer rule can sometimes result in extremely long - c.explain() outputs which are not helpful to the user. Setting this option to true allows - the user to see the entire output, while setting it to false truncates the output. - Default is false. + properties: + + verbose: + type: boolean + description: | + The dynamic partition pruning optimizer rule can sometimes result in extremely long + c.explain() outputs which are not helpful to the user. Setting this option to true + allows the user to see the entire output, while setting it to false truncates the + output. Default is false. fact_dimension_ratio: type: [number, "null"] diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index aaa38e3c6..cc85b09ab 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -18,7 +18,8 @@ sql: dynamic_partition_pruning: True - dynamic_partition_pruning.verbose: False + dynamic_partition_pruning: + verbose: False fact_dimension_ratio: null From 53dea1b38cffd7f4b1cd919c74bf59d64650037d Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 15 Nov 2023 11:51:35 -0800 Subject: [PATCH 15/20] sql.optimizer.verbose --- dask_sql/context.py | 2 +- dask_sql/sql-schema.yaml | 2 +- dask_sql/sql.yaml | 2 +- tests/unit/test_config.py | 10 +++++----- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/dask_sql/context.py b/dask_sql/context.py index 0e04f7a51..f88389f63 100644 --- a/dask_sql/context.py +++ b/dask_sql/context.py @@ -551,7 +551,7 @@ def explain( """ dynamic_partition_pruning = dask_config.get("sql.dynamic_partition_pruning") - if not dask_config.get("sql.dynamic_partition_pruning.verbose"): + if not dask_config.get("sql.optimizer.verbose"): dask_config.set({"sql.dynamic_partition_pruning": False}) if dataframes is not None: diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index 2070283b7..b62ee8c90 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -69,7 +69,7 @@ properties: description: | Whether to apply the dynamic partition pruning optimizer rule. - dynamic_partition_pruning: + optimizer: type: boolean properties: diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index cc85b09ab..13082a85d 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -18,7 +18,7 @@ sql: dynamic_partition_pruning: True - dynamic_partition_pruning: + optimizer: verbose: False fact_dimension_ratio: null diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 033d8b317..c8ff8f468 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -133,23 +133,23 @@ def test_dynamic_partition_pruning(tmpdir): inlist_expr = "df2.x IN ([Int64(1)])" # Default value is False - dask_config.set({"sql.dynamic_partition_pruning.verbose": True}) + dask_config.set({"sql.optimizer.verbose": True}) # When DPP is turned off, the explain output will not contain the INLIST expression dask_config.set({"sql.dynamic_partition_pruning": False}) explain_string = c.explain(query) assert inlist_expr not in explain_string - # When DPP is turned on but sql.dynamic_partition_pruning.verbose is off, the explain output will not contain the + # When DPP is turned on but sql.optimizer.verbose is off, the explain output will not contain the # INLIST expression dask_config.set({"sql.dynamic_partition_pruning": True}) - dask_config.set({"sql.dynamic_partition_pruning.verbose": False}) + dask_config.set({"sql.optimizer.verbose": False}) explain_string = c.explain(query) assert inlist_expr not in explain_string - # When both DPP and sql.dynamic_partition_pruning.verbose are turned on, the explain output will contain the INLIST + # When both DPP and sql.optimizer.verbose are turned on, the explain output will contain the INLIST # expression dask_config.set({"sql.dynamic_partition_pruning": True}) - dask_config.set({"sql.dynamic_partition_pruning.verbose": True}) + dask_config.set({"sql.optimizer.verbose": True}) explain_string = c.explain(query) assert inlist_expr in explain_string From d3826d9e69fd4051816fa287ddd0c857dcc7dcdf Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 15 Nov 2023 13:24:56 -0800 Subject: [PATCH 16/20] lowercase --- dask_sql/sql.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index 13082a85d..5f65ceb2c 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -19,7 +19,7 @@ sql: dynamic_partition_pruning: True optimizer: - verbose: False + verbose: false fact_dimension_ratio: null From ff62770189f554bcece29581141cfc124f983525 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 15 Nov 2023 13:28:25 -0800 Subject: [PATCH 17/20] set object type --- dask_sql/sql-schema.yaml | 2 +- dask_sql/sql.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_sql/sql-schema.yaml b/dask_sql/sql-schema.yaml index b62ee8c90..ace253094 100644 --- a/dask_sql/sql-schema.yaml +++ b/dask_sql/sql-schema.yaml @@ -70,7 +70,7 @@ properties: Whether to apply the dynamic partition pruning optimizer rule. optimizer: - type: boolean + type: object properties: verbose: diff --git a/dask_sql/sql.yaml b/dask_sql/sql.yaml index 5f65ceb2c..13082a85d 100644 --- a/dask_sql/sql.yaml +++ b/dask_sql/sql.yaml @@ -19,7 +19,7 @@ sql: dynamic_partition_pruning: True optimizer: - verbose: false + verbose: False fact_dimension_ratio: null From 371103de5314669f26800dccd9b886d6198955bd Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Wed, 10 Jan 2024 13:29:06 -0800 Subject: [PATCH 18/20] Update test_config.py --- tests/unit/test_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index c8ff8f468..5483df78f 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -103,7 +103,7 @@ def test_dask_setconfig(): @pytest.mark.skipif( - sys.version_info < (3, 9), + sys.version_info <= (3, 9), reason="Writing and reading the Dask DataFrame causes a ProtocolError", ) def test_dynamic_partition_pruning(tmpdir): From c3942adca0b38cedf079e12decf5b2aae093cec1 Mon Sep 17 00:00:00 2001 From: Sarah Yurick Date: Thu, 11 Jan 2024 13:55:17 -0800 Subject: [PATCH 19/20] update version --- tests/unit/test_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 5483df78f..ad4fb2883 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -103,7 +103,7 @@ def test_dask_setconfig(): @pytest.mark.skipif( - sys.version_info <= (3, 9), + sys.version_info < (3, 10), reason="Writing and reading the Dask DataFrame causes a ProtocolError", ) def test_dynamic_partition_pruning(tmpdir): From 9d0a613e25cb6bc4bb77446baabb703f59404e28 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Wed, 31 Jan 2024 09:39:11 -0800 Subject: [PATCH 20/20] Unpin dask/distributed for development --- continuous_integration/docker/conda.txt | 2 +- continuous_integration/docker/main.dockerfile | 2 +- continuous_integration/environment-3.10.yaml | 2 +- continuous_integration/environment-3.11.yaml | 2 +- continuous_integration/environment-3.12.yaml | 2 +- continuous_integration/gpuci/environment-3.10.yaml | 2 +- continuous_integration/gpuci/environment-3.9.yaml | 2 +- continuous_integration/recipe/meta.yaml | 2 +- docs/environment.yml | 2 +- docs/requirements-docs.txt | 2 +- pyproject.toml | 4 ++-- 11 files changed, 12 insertions(+), 12 deletions(-) diff --git a/continuous_integration/docker/conda.txt b/continuous_integration/docker/conda.txt index 72ce2f1b9..270c2febd 100644 --- a/continuous_integration/docker/conda.txt +++ b/continuous_integration/docker/conda.txt @@ -1,5 +1,5 @@ python>=3.9 -dask>=2022.3.0,<=2024.1.1 +dask>=2022.3.0 pandas>=1.4.0 jpype1>=1.0.2 openjdk>=8 diff --git a/continuous_integration/docker/main.dockerfile b/continuous_integration/docker/main.dockerfile index 458735234..78cd46938 100644 --- a/continuous_integration/docker/main.dockerfile +++ b/continuous_integration/docker/main.dockerfile @@ -16,7 +16,7 @@ RUN mamba install -y \ # build requirements "maturin>=1.3,<1.4" \ # core dependencies - "dask>=2022.3.0,<=2024.1.1" \ + "dask>=2022.3.0" \ "pandas>=1.4.0" \ "fastapi>=0.92.0" \ "httpx>=0.24.1" \ diff --git a/continuous_integration/environment-3.10.yaml b/continuous_integration/environment-3.10.yaml index 54de0ce90..912e2c54e 100644 --- a/continuous_integration/environment-3.10.yaml +++ b/continuous_integration/environment-3.10.yaml @@ -3,7 +3,7 @@ channels: - conda-forge dependencies: - c-compiler -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/environment-3.11.yaml b/continuous_integration/environment-3.11.yaml index 882e225e7..cd77ac8d5 100644 --- a/continuous_integration/environment-3.11.yaml +++ b/continuous_integration/environment-3.11.yaml @@ -3,7 +3,7 @@ channels: - conda-forge dependencies: - c-compiler -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/environment-3.12.yaml b/continuous_integration/environment-3.12.yaml index 48d56068e..53b52e629 100644 --- a/continuous_integration/environment-3.12.yaml +++ b/continuous_integration/environment-3.12.yaml @@ -3,7 +3,7 @@ channels: - conda-forge dependencies: - c-compiler -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/gpuci/environment-3.10.yaml b/continuous_integration/gpuci/environment-3.10.yaml index 3f19cca23..8ad4e3fdf 100644 --- a/continuous_integration/gpuci/environment-3.10.yaml +++ b/continuous_integration/gpuci/environment-3.10.yaml @@ -9,7 +9,7 @@ channels: dependencies: - c-compiler - zlib -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/gpuci/environment-3.9.yaml b/continuous_integration/gpuci/environment-3.9.yaml index 703d5465e..96bec123d 100644 --- a/continuous_integration/gpuci/environment-3.9.yaml +++ b/continuous_integration/gpuci/environment-3.9.yaml @@ -9,7 +9,7 @@ channels: dependencies: - c-compiler - zlib -- dask>=2022.3.0,<=2024.1.1 +- dask>=2022.3.0 - fastapi>=0.92.0 - fugue>=0.7.3 - httpx>=0.24.1 diff --git a/continuous_integration/recipe/meta.yaml b/continuous_integration/recipe/meta.yaml index e30f53efd..60a5aa299 100644 --- a/continuous_integration/recipe/meta.yaml +++ b/continuous_integration/recipe/meta.yaml @@ -32,7 +32,7 @@ requirements: - xz # [linux64] run: - python - - dask >=2022.3.0,<=2024.1.1 + - dask >=2022.3.0 - pandas >=1.4.0 - fastapi >=0.92.0 - httpx >=0.24.1 diff --git a/docs/environment.yml b/docs/environment.yml index 10ab623d6..2d0e08ba0 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -6,7 +6,7 @@ dependencies: - sphinx>=4.0.0 - sphinx-tabs - dask-sphinx-theme>=2.0.3 - - dask>=2022.3.0,<=2024.1.1 + - dask>=2022.3.0 - pandas>=1.4.0 - fugue>=0.7.3 # FIXME: https://github.com/fugue-project/fugue/issues/526 diff --git a/docs/requirements-docs.txt b/docs/requirements-docs.txt index 9fd5d4738..1f2052a92 100644 --- a/docs/requirements-docs.txt +++ b/docs/requirements-docs.txt @@ -1,7 +1,7 @@ sphinx>=4.0.0 sphinx-tabs dask-sphinx-theme>=3.0.0 -dask>=2022.3.0,<=2024.1.1 +dask>=2022.3.0 pandas>=1.4.0 fugue>=0.7.3 # FIXME: https://github.com/fugue-project/fugue/issues/526 diff --git a/pyproject.toml b/pyproject.toml index 3d2ee4843..75ec4519f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,8 +27,8 @@ classifiers = [ readme = "README.md" requires-python = ">=3.9" dependencies = [ - "dask[dataframe]>=2022.3.0,<=2024.1.1", - "distributed>=2022.3.0,<=2024.1.1", + "dask[dataframe]>=2022.3.0", + "distributed>=2022.3.0", "pandas>=1.4.0", "fastapi>=0.92.0", "httpx>=0.24.1",