Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 275 additions & 7 deletions datafusion/core/tests/physical_optimizer/pushdown_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@
//! 4. Early termination is enabled for TopK queries
//! 5. Prefix matching works correctly

use std::sync::Arc;
use datafusion_physical_expr::expressions;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_optimizer::pushdown_sort::PushdownSort;

use crate::physical_optimizer::test_utils::{
OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec,
parquet_exec_with_sort, repartition_exec, schema, sort_exec, sort_exec_with_fetch,
sort_expr,
};
use crate::physical_optimizer::test_utils::{OptimizationTest, coalesce_batches_exec, coalesce_partitions_exec, parquet_exec, parquet_exec_with_sort, repartition_exec, schema, sort_exec, sort_exec_with_fetch, sort_expr, simple_projection_exec, projection_exec_with_alias, sort_expr_named, projection_exec};

#[test]
fn test_sort_pushdown_disabled() {
Expand Down Expand Up @@ -619,15 +618,15 @@ fn test_pushdown_through_blocking_node() {
// Middle: Aggregate (blocks pushdown from outer sort)
// GROUP BY a, COUNT(b)
let group_by = PhysicalGroupBy::new_single(vec![(
Arc::new(datafusion_physical_expr::expressions::Column::new("a", 0)) as _,
Arc::new(expressions::Column::new("a", 0)) as _,
"a".to_string(),
)]);

let count_expr = Arc::new(
AggregateExprBuilder::new(
count_udaf(),
vec![
Arc::new(datafusion_physical_expr::expressions::Column::new("b", 1)) as _,
Arc::new(expressions::Column::new("b", 1)) as _,
],
)
.schema(Arc::clone(&schema))
Expand Down Expand Up @@ -670,3 +669,272 @@ fn test_pushdown_through_blocking_node() {
"
);
}

// ============================================================================
// PROJECTION TESTS
// ============================================================================

#[test]
fn test_sort_pushdown_through_simple_projection() {
// Sort pushes through projection with simple column references
let schema = schema();

// Source has [a ASC] ordering
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

// Projection: SELECT a, b (simple column references)
let projection = simple_projection_exec(source, vec![0, 1]); // columns a, b

// Request [a DESC] - should push through projection to source
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
let plan = sort_exec(desc_ordering, projection);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
"
);
}

#[test]
fn test_sort_pushdown_through_projection_with_alias() {
// Sort pushes through projection with column aliases
let schema = schema();

// Source has [a ASC] ordering
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

// Projection: SELECT a AS id, b AS value
let projection = projection_exec_with_alias(source, vec![(0, "id"), (1, "value")]);

// Request [id DESC] - should map to [a DESC] and push down
let id_expr = sort_expr_named("id", 0);
let desc_ordering = LexOrdering::new(vec![id_expr.reverse()]).unwrap();
let plan = sort_exec(desc_ordering, projection);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[id@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as id, b@1 as value]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- SortExec: expr=[id@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as id, b@1 as value]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
"
);
}

#[test]
fn test_no_sort_pushdown_through_computed_projection() {
use datafusion_expr::Operator;

// Sort should NOT push through projection with computed columns
let schema = schema();

// Source has [a ASC] ordering
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

// Projection: SELECT a+b as sum, c
let projection = projection_exec(
vec![
(
Arc::new(expressions::BinaryExpr::new(
Arc::new(expressions::Column::new("a", 0)),
Operator::Plus,
Arc::new(expressions::Column::new("b", 1)),
)) as Arc<dyn PhysicalExpr>,
"sum".to_string(),
),
(
Arc::new(expressions::Column::new("c", 2)) as Arc<dyn PhysicalExpr>,
"c".to_string(),
),
],
source,
)
.unwrap();

// Request [sum DESC] - should NOT push down (sum is computed)
let sum_expr = sort_expr_named("sum", 0);
let desc_ordering = LexOrdering::new(vec![sum_expr.reverse()]).unwrap();
let plan = sort_exec(desc_ordering, projection);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[sum@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + b@1 as sum, c@2 as c]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- SortExec: expr=[sum@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + b@1 as sum, c@2 as c]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
"
);
}

#[test]
fn test_sort_pushdown_projection_reordered_columns() {
// Sort pushes through projection that reorders columns
let schema = schema();

// Source has [a ASC] ordering
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

// Projection: SELECT c, b, a (columns reordered)
let projection = simple_projection_exec(source, vec![2, 1, 0]); // c, b, a

// Request [a DESC] where a is now at index 2 in projection output
let a_expr_at_2 = sort_expr_named("a", 2);
let desc_ordering = LexOrdering::new(vec![a_expr_at_2.reverse()]).unwrap();
let plan = sort_exec(desc_ordering, projection);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[a@2 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- SortExec: expr=[a@2 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[c@2 as c, b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
"
);
}

#[test]
fn test_sort_pushdown_projection_with_limit() {
// Sort with LIMIT pushes through simple projection
let schema = schema();

// Source has [a ASC] ordering
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

// Projection: SELECT a, b
let projection = simple_projection_exec(source, vec![0, 1]);

// Request [a DESC] with LIMIT 10
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
let plan = sort_exec_with_fetch(desc_ordering, Some(10), projection);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- SortExec: TopK(fetch=10), expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
"
);
}

#[test]
fn test_sort_pushdown_through_projection_and_coalesce() {
// Sort pushes through both projection and coalesce batches
let schema = schema();

// Source has [a ASC] ordering
let a = sort_expr("a", &schema);
let source_ordering = LexOrdering::new(vec![a.clone()]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

let coalesce = coalesce_batches_exec(source, 1024);

// Projection: SELECT a, b
let projection = simple_projection_exec(coalesce, vec![0, 1]);

// Request [a DESC]
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
let plan = sort_exec(desc_ordering, projection);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- CoalesceBatchesExec: target_batch_size=1024
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
output:
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- CoalesceBatchesExec: target_batch_size=1024
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
"
);
}

#[test]
fn test_sort_pushdown_projection_subset_of_columns() {
// Sort pushes through projection that selects subset of columns
let schema = schema();

// Source has [a ASC, b ASC] ordering
let a = sort_expr("a", &schema);
let b = sort_expr("b", &schema);
let source_ordering = LexOrdering::new(vec![a.clone(), b.clone()]).unwrap();
let source = parquet_exec_with_sort(schema.clone(), vec![source_ordering]);

// Projection: SELECT a (subset of columns)
let projection = simple_projection_exec(source, vec![0]);

// Request [a DESC]
let desc_ordering = LexOrdering::new(vec![a.reverse()]).unwrap();
let plan = sort_exec(desc_ordering, projection);

insta::assert_snapshot!(
OptimizationTest::new(plan, PushdownSort::new(), true),
@r"
OptimizationTest:
input:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC, b@1 ASC], file_type=parquet
output:
Ok:
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet, reverse_row_groups=true
"
);
}
47 changes: 47 additions & 0 deletions datafusion/core/tests/physical_optimizer/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,3 +774,50 @@ pub fn format_execution_plan(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
fn format_lines(s: &str) -> Vec<String> {
s.trim().split('\n').map(|s| s.to_string()).collect()
}

/// Create a simple ProjectionExec with column indices (simplified version)
pub fn simple_projection_exec(
input: Arc<dyn ExecutionPlan>,
columns: Vec<usize>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = columns
.iter()
.map(|&i| {
let field = schema.field(i);
(
Arc::new(expressions::Column::new(field.name(), i)) as Arc<dyn PhysicalExpr>,
field.name().to_string(),
)
})
.collect();

projection_exec(exprs, input).unwrap()
}

/// Create a ProjectionExec with column aliases
pub fn projection_exec_with_alias(
input: Arc<dyn ExecutionPlan>,
columns: Vec<(usize, &str)>,
) -> Arc<dyn ExecutionPlan> {
let schema = input.schema();
let exprs: Vec<(Arc<dyn PhysicalExpr>, String)> = columns
.iter()
.map(|&(i, alias)| {
(
Arc::new(expressions::Column::new(schema.field(i).name(), i)) as Arc<dyn PhysicalExpr>,
alias.to_string(),
)
})
.collect();

projection_exec(exprs, input).unwrap()
}

/// Create a sort expression with custom name and index
pub fn sort_expr_named(name: &str, index: usize) -> PhysicalSortExpr {
PhysicalSortExpr {
expr: Arc::new(expressions::Column::new(name, index)),
options: SortOptions::default(),
}
}
Loading
Loading