Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7c66b6f
create optimizer_config
sarahyurick Jul 7, 2023
6f460fa
add join_reorder configs
sarahyurick Jul 10, 2023
ccb8d42
add verbose_optimizer
sarahyurick Jul 10, 2023
16b7a8f
add test_dynamic_partition_pruning
sarahyurick Jul 10, 2023
28a5d7f
skip 3.8 tests
sarahyurick Jul 10, 2023
f3e9f52
remove dpp default
sarahyurick Jul 11, 2023
fa4976b
style
sarahyurick Jul 11, 2023
c9c5a10
temporarily remove rust changes
sarahyurick Aug 17, 2023
e527d83
temp remove sql.rs changes
sarahyurick Aug 17, 2023
3fcf740
Merge branch 'main' into optimizer_improvements
sarahyurick Aug 17, 2023
35c363b
readd rust changes
sarahyurick Aug 17, 2023
c1a19e2
check optimized_plan is Ok
sarahyurick Aug 17, 2023
74a222e
Merge branch 'main' into optimizer_improvements
sarahyurick Aug 30, 2023
4a06278
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 7, 2023
f2da571
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 13, 2023
74f04b5
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 13, 2023
bdb287c
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 14, 2023
67d425d
sql.dynamic_partition_pruning.verbose
sarahyurick Nov 15, 2023
4e8bb93
edit yaml style
sarahyurick Nov 15, 2023
33b36c6
edit
sarahyurick Nov 15, 2023
53dea1b
sql.optimizer.verbose
sarahyurick Nov 15, 2023
d3826d9
lowercase
sarahyurick Nov 15, 2023
ff62770
set object type
sarahyurick Nov 15, 2023
24c4c8f
Merge branch 'main' into optimizer_improvements
sarahyurick Nov 28, 2023
2caaa9b
Merge branch 'main' into optimizer_improvements
sarahyurick Dec 15, 2023
3f3d401
Merge branch 'main' into optimizer_improvements
sarahyurick Jan 9, 2024
371103d
Update test_config.py
sarahyurick Jan 10, 2024
c3942ad
update version
sarahyurick Jan 11, 2024
6c995c2
Merge branch 'main' into optimizer_improvements
sarahyurick Jan 23, 2024
4ed7416
Merge remote-tracking branch 'upstream/main' into pr/sarahyurick/1199
charlesbluca Jan 31, 2024
9d0a613
Unpin dask/distributed for development
charlesbluca Jan 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dask_planner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fn rust(py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<sql::function::DaskFunction>()?;
m.add_class::<sql::table::DaskStatistics>()?;
m.add_class::<sql::logical::PyLogicalPlan>()?;
m.add_class::<sql::DaskSQLOptimizerConfig>()?;

// Exceptions
m.add(
Expand Down
71 changes: 56 additions & 15 deletions dask_planner/src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,37 @@ pub struct DaskSQLContext {
current_schema: String,
schemas: HashMap<String, schema::DaskSchema>,
options: ConfigOptions,
optimizer_config: DaskSQLOptimizerConfig,
}

#[pyclass(name = "DaskSQLOptimizerConfig", module = "dask_planner", subclass)]
#[derive(Debug, Clone)]
pub struct DaskSQLOptimizerConfig {
dynamic_partition_pruning: bool,
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
}

#[pymethods]
impl DaskSQLOptimizerConfig {
#[new]
pub fn new(
dynamic_partition_pruning: bool,
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
) -> Self {
Self {
dynamic_partition_pruning,
fact_dimension_ratio,
max_fact_tables,
preserve_user_order,
filter_selectivity,
}
}
}

impl ContextProvider for DaskSQLContext {
Expand Down Expand Up @@ -483,18 +513,22 @@ impl ContextProvider for DaskSQLContext {
#[pymethods]
impl DaskSQLContext {
#[new]
pub fn new(default_catalog_name: &str, default_schema_name: &str) -> Self {
pub fn new(
default_catalog_name: &str,
default_schema_name: &str,
optimizer_config: DaskSQLOptimizerConfig,
) -> Self {
Self {
current_catalog: default_catalog_name.to_owned(),
current_schema: default_schema_name.to_owned(),
schemas: HashMap::new(),
options: ConfigOptions::new(),
dynamic_partition_pruning: false,
optimizer_config,
}
}

pub fn apply_dynamic_partition_pruning(&mut self, config: bool) -> PyResult<()> {
self.dynamic_partition_pruning = config;
pub fn set_optimizer_config(&mut self, config: DaskSQLOptimizerConfig) -> PyResult<()> {
self.optimizer_config = config;
Ok(())
}

Expand Down Expand Up @@ -585,21 +619,28 @@ impl DaskSQLContext {
Ok(existing_plan)
}
_ => {
let optimized_plan = optimizer::DaskSqlOptimizer::new()
.optimize(existing_plan.original_plan)
let optimized_plan = optimizer::DaskSqlOptimizer::new(
self.optimizer_config.fact_dimension_ratio,
self.optimizer_config.max_fact_tables,
self.optimizer_config.preserve_user_order,
self.optimizer_config.filter_selectivity,
)
.optimize(existing_plan.original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp);
if self.optimizer_config.dynamic_partition_pruning {
optimizer::DaskSqlOptimizer::dynamic_partition_pruner(
self.optimizer_config.fact_dimension_ratio,
)
.optimize_once(optimized_plan.unwrap().original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp);
if self.dynamic_partition_pruning {
optimizer::DaskSqlOptimizer::dynamic_partition_pruner()
.optimize_once(optimized_plan.unwrap().original_plan)
.map(|k| PyLogicalPlan {
original_plan: k,
current_node: None,
})
.map_err(py_optimization_exp)
.map_err(py_optimization_exp)
} else {
optimized_plan
}
Expand Down
26 changes: 20 additions & 6 deletions dask_planner/src/sql/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ pub struct DaskSqlOptimizer {
impl DaskSqlOptimizer {
/// Creates a new instance of the DaskSqlOptimizer with all the DataFusion desired
/// optimizers as well as any custom `OptimizerRule` trait impls that might be desired.
pub fn new() -> Self {
pub fn new(
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kind of annoying/bulky to have to pass in these parameters every time we do a DaskSqlOptimizer::new(), but since there's really only 2 places in our code where we do this, I figured it's not too bad.

) -> Self {
debug!("Creating new instance of DaskSqlOptimizer");

let rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
Expand Down Expand Up @@ -72,7 +77,12 @@ impl DaskSqlOptimizer {
Arc::new(PushDownFilter::new()),
// Arc::new(SingleDistinctToGroupBy::new()),
// Dask-SQL specific optimizations
Arc::new(JoinReorder::default()),
Arc::new(JoinReorder::new(
fact_dimension_ratio,
max_fact_tables,
preserve_user_order,
filter_selectivity,
)),
// The previous optimizations added expressions and projections,
// that might benefit from the following rules
Arc::new(SimplifyExpressions::new()),
Expand All @@ -91,9 +101,13 @@ impl DaskSqlOptimizer {

// Create a separate instance of this optimization rule, since we want to ensure that it only
// runs one time
pub fn dynamic_partition_pruner() -> Self {
let rule: Vec<Arc<dyn OptimizerRule + Sync + Send>> =
vec![Arc::new(DynamicPartitionPruning::new())];
pub fn dynamic_partition_pruner(fact_dimension_ratio: Option<f64>) -> Self {
let rule: Vec<Arc<dyn OptimizerRule + Sync + Send>>;
if let Some(f) = fact_dimension_ratio {
rule = vec![Arc::new(DynamicPartitionPruning::new(f))];
} else {
rule = vec![Arc::new(DynamicPartitionPruning::default())];
}

Self {
optimizer: Optimizer::with_rules(rule),
Expand Down Expand Up @@ -177,7 +191,7 @@ mod tests {
let plan = sql_to_rel.sql_statement_to_plan(statement.clone()).unwrap();

// optimize the logical plan
let optimizer = DaskSqlOptimizer::new();
let optimizer = DaskSqlOptimizer::new(None, None, None, None);
optimizer.optimize(plan)
}

Expand Down
26 changes: 18 additions & 8 deletions dask_planner/src/sql/optimizer/dynamic_partition_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,24 @@ use log::warn;
use crate::sql::table::DaskTableSource;

// Optimizer rule for dynamic partition pruning
pub struct DynamicPartitionPruning {}
pub struct DynamicPartitionPruning {
/// Ratio of the size of the dimension tables to fact tables
fact_dimension_ratio: f64,
}

impl DynamicPartitionPruning {
pub fn new() -> Self {
Self {}
pub fn new(fact_dimension_ratio: f64) -> Self {
Self {
fact_dimension_ratio,
}
}
}

impl Default for DynamicPartitionPruning {
fn default() -> Self {
Self {
fact_dimension_ratio: 0.3,
}
}
}

Expand Down Expand Up @@ -106,9 +119,6 @@ impl OptimizerRule for DynamicPartitionPruning {
(left_table.unwrap(), right_table.unwrap());
let (left_field, right_field) = (left_field.unwrap(), right_field.unwrap());

// TODO: Consider allowing the fact_dimension_ratio to be configured by the
// user. See issue: https://github.com/dask-contrib/dask-sql/issues/1121
let fact_dimension_ratio = 0.3;
let (mut left_filtered_table, mut right_filtered_table) = (None, None);

// Check if join uses an alias instead of the table name itself. Need to use
Expand Down Expand Up @@ -136,7 +146,7 @@ impl OptimizerRule for DynamicPartitionPruning {
.size
.unwrap_or(largest_size as usize) as f64
/ largest_size
< fact_dimension_ratio
< self.fact_dimension_ratio
{
left_filtered_table =
read_table(left_table.clone(), left_field.clone(), tables.clone());
Expand All @@ -149,7 +159,7 @@ impl OptimizerRule for DynamicPartitionPruning {
.size
.unwrap_or(largest_size as usize) as f64
/ largest_size
< fact_dimension_ratio
< self.fact_dimension_ratio
{
right_filtered_table =
read_table(right_table.clone(), right_field.clone(), tables.clone());
Expand Down
23 changes: 14 additions & 9 deletions dask_planner/src/sql/optimizer/join_reorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,30 @@ use log::warn;
use crate::sql::table::DaskTableSource;

pub struct JoinReorder {
/// Maximum number of fact tables to allow in a join
max_fact_tables: usize,
/// Ratio of the size of the dimension tables to fact tables
fact_dimension_ratio: f64,
/// Maximum number of fact tables to allow in a join
max_fact_tables: usize,
/// Whether to preserve user-defined order of unfiltered dimensions
preserve_user_order: bool,
/// Constant to use when determining the number of rows produced by a
/// filtered relation
filter_selectivity: f64,
}

impl Default for JoinReorder {
fn default() -> Self {
impl JoinReorder {
pub fn new(
fact_dimension_ratio: Option<f64>,
max_fact_tables: Option<usize>,
preserve_user_order: Option<bool>,
filter_selectivity: Option<f64>,
) -> Self {
Self {
max_fact_tables: 2,
// FIXME: fact_dimension_ratio should be 0.3
fact_dimension_ratio: 0.7,
preserve_user_order: true,
filter_selectivity: 1.0,
// FIXME: Default value for fact_dimension_ratio should be 0.3, not 0.7
fact_dimension_ratio: fact_dimension_ratio.unwrap_or(0.7),
max_fact_tables: max_fact_tables.unwrap_or(2),
preserve_user_order: preserve_user_order.unwrap_or(true),
filter_selectivity: filter_selectivity.unwrap_or(1.0),
}
}
}
Expand Down
33 changes: 26 additions & 7 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dask_planner.rust import (
DaskSchema,
DaskSQLContext,
DaskSQLOptimizerConfig,
DaskTable,
DFOptimizationException,
DFParsingException,
Expand Down Expand Up @@ -98,13 +99,20 @@ def __init__(self, logging_level=logging.INFO):
# A started SQL server (useful for jupyter notebooks)
self.sql_server = None

# Create the `DaskSQLContext` Rust context
self.context = DaskSQLContext(self.catalog_name, self.schema_name)
self.context.register_schema(self.schema_name, DaskSchema(self.schema_name))
# Create the `DaskSQLOptimizerConfig` Rust context
optimizer_config = DaskSQLOptimizerConfig(
dask_config.get("sql.dynamic_partition_pruning"),
dask_config.get("sql.fact_dimension_ratio"),
dask_config.get("sql.max_fact_tables"),
dask_config.get("sql.preserve_user_order"),
dask_config.get("sql.filter_selectivity"),
)

self.context.apply_dynamic_partition_pruning(
dask_config.get("sql.dynamic_partition_pruning")
# Create the `DaskSQLContext` Rust context
self.context = DaskSQLContext(
self.catalog_name, self.schema_name, optimizer_config
)
self.context.register_schema(self.schema_name, DaskSchema(self.schema_name))

# # Register any default plugins, if nothing was registered before.
RelConverter.add_plugin_class(logical.DaskAggregatePlugin, replace=False)
Expand Down Expand Up @@ -542,11 +550,16 @@ def explain(
:obj:`str`: a description of the created relational algebra.

"""
dynamic_partition_pruning = dask_config.get("sql.dynamic_partition_pruning")
if not dask_config.get("sql.verbose_optimizer"):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What am I missing here? Seems like getting the config value for sql.verbose_optimizer but then setting values related to sql.dynamic_partition_pruning?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when I was working on the DPP PR, I found it pretty annoying that the c.explain() output would contain such long INLIST expressions (hundreds and thousands of values), which aren't very useful for a user who maybe wouldn't even understand where all the values are coming from. With verbose_optimizer set to False, though, it just affects the c.explain() function, temporarily turning off DPP so that we don't get the long INLIST expressions and instead just get what the LogicalPlan would be if DPP was turned off. Then at the end we switch DPP back on (if it was originally set on), so that the query itself can still run with DPP.

So if DPP is on and verbose_optimizer is False, DPP will still run with c.sql(), but c.explain() won't contain the DPP values.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually I like what this is aiming to accomplish, but don't really like the idea of implicitly skipping optimizations without warning the user. Feel like maybe it might make more sense to document the introduction of the verbose logical plans in the schema for sql.dynamic_partition_pruning, with a suggestion to disable this config for c.explain() if this behavior isn't desired.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the default is to run DPP but to not have it in c.explain(). This is because having it in c.explain() would introduce filters that are not specified by the user (i.e., filters that are added by DPP) which potentially makes the LogicalPlan more difficult for the user to interpret. So we're not actually skipping DPP, we're just not printing how it affects the LogicalPlan for the user to see.

I like the idea of renaming it to sql.explain.skipped_optimizers or sql.dynamic_partition_pruning.verbose to make it more clear, although I do prefer having it default to not having DPP printed in c.explain() (but still run DPP by default).

dask_config.set({"sql.dynamic_partition_pruning": False})

if dataframes is not None:
for df_name, df in dataframes.items():
self.create_table(df_name, df, gpu=gpu)

_, rel_string = self._get_ral(sql)
dask_config.set({"sql.dynamic_partition_pruning": dynamic_partition_pruning})
return rel_string

def visualize(self, sql: str, filename="mydask.png") -> None: # pragma: no cover
Expand Down Expand Up @@ -799,9 +812,15 @@ def _get_ral(self, sql):
"""Helper function to turn the sql query into a relational algebra and resulting column names"""

logger.debug(f"Entering _get_ral('{sql}')")
self.context.apply_dynamic_partition_pruning(
dask_config.get("sql.dynamic_partition_pruning")

optimizer_config = DaskSQLOptimizerConfig(
dask_config.get("sql.dynamic_partition_pruning"),
dask_config.get("sql.fact_dimension_ratio"),
dask_config.get("sql.max_fact_tables"),
dask_config.get("sql.preserve_user_order"),
dask_config.get("sql.filter_selectivity"),
)
self.context.set_optimizer_config(optimizer_config)

# get the schema of what we currently have registered
schemas = self._prepare_schemas()
Expand Down
32 changes: 32 additions & 0 deletions dask_sql/sql-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,38 @@ properties:
description: |
Whether to apply the dynamic partition pruning optimizer rule.

verbose_optimizer:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like the name verbose_optimizer, but I couldn't really think of a better name...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm no naming expert. I like the feature though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally would rather avoid this configuration for now, but if we do have it maybe it would make sense as something like sql.explain.skipped_optimizers? Then it could be a list where users pass any arbitrary rules, but it defaults to whatever rules we always want disabled for explains.

type: boolean
description: |
The dynamic partition pruning optimizer rule can sometimes result in extremely long
c.explain() outputs which are not helpful to the user. Setting this option to true allows
the user to see the entire output, while setting it to false truncates the output.
Default is false.

fact_dimension_ratio:
type: [number, "null"]
description: |
Ratio of the size of the dimension tables to fact tables. Parameter for dynamic partition
pruning and join reorder optimizer rules.

max_fact_tables:
type: [integer, "null"]
description: |
Maximum number of fact tables to allow in a join. Parameter for join reorder optimizer
rule.

preserve_user_order:
type: [boolean, "null"]
description: |
Whether to preserve user-defined order of unfiltered dimensions. Parameter for join
reorder optimizer rule.

filter_selectivity:
type: [number, "null"]
description: |
Constant to use when determining the number of rows produced by a filtered relation.
Parameter for join reorder optimizer rule.

sort:
type: object
properties:
Expand Down
10 changes: 10 additions & 0 deletions dask_sql/sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ sql:

dynamic_partition_pruning: True

verbose_optimizer: False

fact_dimension_ratio: null

max_fact_tables: null

preserve_user_order: null

filter_selectivity: null

sort:
topk-nelem-limit: 1000000

Expand Down
Loading