Skip to content

Commit b3ff6d8

Browse files
adriangbJefffrey
andauthored
Misc improvements to ProjectionExprs (#18719)
## Summary This PR enhances the physical-expr projection handling with several improvements needed for better projection management in datasources. ## Changes 1. **Add trait implementations**: - Added `PartialEq` and `Eq` for `ProjectionExpr` - Added `PartialEq` and `Eq` for `ProjectionExprs` 2. **Add `project_batch()` method**: - Efficiently projects `RecordBatch` with pre-computed schema - Handles empty projections correctly - Reduces schema projection overhead for repeated calls 3. **Fix `update_expr()` bug**: - **Bug**: Previously returned `None` for literal expressions (no column references) - **Fix**: Now returns `Some(expr)` for both `Unchanged` and `RewrittenValid` states - **Impact**: Critical for queries like `SELECT 1 FROM table` where no file columns are needed 4. **Change `from_indices()` signature**: - Changed from `&SchemaRef` to `&Schema` for consistency 5. **Add comprehensive tests**: - `test_merge_empty_projection_with_literal()` - Reproduces roundtrip issue - `test_update_expr_with_literal()` - Tests literal handling - `test_update_expr_with_complex_literal_expr()` - Tests mixed expressions ## Part of This PR is part of #18627 - a larger effort to refactor projection handling in DataFusion. ## Testing All tests pass: - ✅ New projection tests - ✅ Existing physical-expr test suite - ✅ Doc tests ## AI use I asked Claude to extract this change from #18627 --------- Co-authored-by: Jeffrey Vo <[email protected]>
1 parent 1b42a7c commit b3ff6d8

File tree

2 files changed

+216
-47
lines changed

2 files changed

+216
-47
lines changed

datafusion/physical-expr/src/projection.rs

Lines changed: 175 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::expressions::Column;
2222
use crate::utils::collect_columns;
2323
use crate::PhysicalExpr;
2424

25+
use arrow::array::{RecordBatch, RecordBatchOptions};
2526
use arrow::datatypes::{Field, Schema, SchemaRef};
2627
use datafusion_common::stats::{ColumnStatistics, Precision};
2728
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -30,6 +31,7 @@ use datafusion_common::{
3031
};
3132

3233
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
34+
use datafusion_physical_expr_common::utils::evaluate_expressions_to_arrays;
3335
use indexmap::IndexMap;
3436
use itertools::Itertools;
3537

@@ -49,6 +51,15 @@ pub struct ProjectionExpr {
4951
pub alias: String,
5052
}
5153

54+
impl PartialEq for ProjectionExpr {
55+
fn eq(&self, other: &Self) -> bool {
56+
let ProjectionExpr { expr, alias } = self;
57+
expr.eq(&other.expr) && *alias == other.alias
58+
}
59+
}
60+
61+
impl Eq for ProjectionExpr {}
62+
5263
impl std::fmt::Display for ProjectionExpr {
5364
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
5465
if self.expr.to_string() == self.alias {
@@ -101,7 +112,7 @@ impl From<ProjectionExpr> for (Arc<dyn PhysicalExpr>, String) {
101112
/// This struct encapsulates multiple `ProjectionExpr` instances,
102113
/// representing a complete projection operation and provides
103114
/// methods to manipulate and analyze the projection as a whole.
104-
#[derive(Debug, Clone)]
115+
#[derive(Debug, Clone, PartialEq, Eq)]
105116
pub struct ProjectionExprs {
106117
exprs: Vec<ProjectionExpr>,
107118
}
@@ -194,7 +205,7 @@ impl ProjectionExprs {
194205
/// assert_eq!(projection_with_dups.as_ref()[1].alias, "a"); // duplicate
195206
/// assert_eq!(projection_with_dups.as_ref()[2].alias, "b");
196207
/// ```
197-
pub fn from_indices(indices: &[usize], schema: &SchemaRef) -> Self {
208+
pub fn from_indices(indices: &[usize], schema: &Schema) -> Self {
198209
let projection_exprs = indices.iter().map(|&i| {
199210
let field = schema.field(i);
200211
ProjectionExpr {
@@ -398,6 +409,22 @@ impl ProjectionExprs {
398409
))
399410
}
400411

412+
/// Create a new [`Projector`] from this projection and an input schema.
413+
///
414+
/// A [`Projector`] can be used to apply this projection to record batches.
415+
///
416+
/// # Errors
417+
/// This function returns an error if the output schema cannot be constructed from the input schema
418+
/// with the given projection expressions.
419+
/// For example, if an expression only works with integer columns but the input schema has a string column at that index.
420+
pub fn make_projector(&self, input_schema: &Schema) -> Result<Projector> {
421+
let output_schema = Arc::new(self.project_schema(input_schema)?);
422+
Ok(Projector {
423+
projection: self.clone(),
424+
output_schema,
425+
})
426+
}
427+
401428
/// Project statistics according to this projection.
402429
/// For example, for a projection `SELECT a AS x, b + 1 AS y`, where `a` is at index 0 and `b` is at index 1,
403430
/// if the input statistics has column statistics for columns `a`, `b`, and `c`, the output statistics would have column statistics for columns `x` and `y`.
@@ -446,6 +473,57 @@ impl<'a> IntoIterator for &'a ProjectionExprs {
446473
}
447474
}
448475

476+
/// Applies a projection to record batches.
477+
///
478+
/// A [`Projector`] uses a set of projection expressions to transform
479+
/// and a pre-computed output schema to project record batches accordingly.
480+
///
481+
/// The main reason to use a `Projector` is to avoid repeatedly computing
482+
/// the output schema for each batch, which can be costly if the projection
483+
/// expressions are complex.
484+
#[derive(Clone, Debug)]
485+
pub struct Projector {
486+
projection: ProjectionExprs,
487+
output_schema: SchemaRef,
488+
}
489+
490+
impl Projector {
491+
/// Project a record batch according to this projector's expressions.
492+
///
493+
/// # Errors
494+
/// This function returns an error if any expression evaluation fails
495+
/// or if the output schema of the resulting record batch does not match
496+
/// the pre-computed output schema of the projector.
497+
pub fn project_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
498+
let arrays = evaluate_expressions_to_arrays(
499+
self.projection.exprs.iter().map(|p| &p.expr),
500+
batch,
501+
)?;
502+
503+
if arrays.is_empty() {
504+
let options =
505+
RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));
506+
RecordBatch::try_new_with_options(
507+
Arc::clone(&self.output_schema),
508+
arrays,
509+
&options,
510+
)
511+
.map_err(Into::into)
512+
} else {
513+
RecordBatch::try_new(Arc::clone(&self.output_schema), arrays)
514+
.map_err(Into::into)
515+
}
516+
}
517+
518+
pub fn output_schema(&self) -> &SchemaRef {
519+
&self.output_schema
520+
}
521+
522+
pub fn projection(&self) -> &ProjectionExprs {
523+
&self.projection
524+
}
525+
}
526+
449527
impl IntoIterator for ProjectionExprs {
450528
type Item = ProjectionExpr;
451529
type IntoIter = std::vec::IntoIter<ProjectionExpr>;
@@ -547,7 +625,13 @@ pub fn update_expr(
547625
})
548626
.data()?;
549627

550-
Ok((state == RewriteState::RewrittenValid).then_some(new_expr))
628+
match state {
629+
RewriteState::RewrittenInvalid => Ok(None),
630+
// Both Unchanged and RewrittenValid are valid:
631+
// - Unchanged means no columns to rewrite (e.g., literals)
632+
// - RewrittenValid means columns were successfully rewritten
633+
RewriteState::Unchanged | RewriteState::RewrittenValid => Ok(Some(new_expr)),
634+
}
551635
}
552636

553637
/// Stores target expressions, along with their indices, that associate with a
@@ -2009,6 +2093,94 @@ pub(crate) mod tests {
20092093
);
20102094
}
20112095

2096+
#[test]
2097+
fn test_merge_empty_projection_with_literal() -> Result<()> {
2098+
// This test reproduces the issue from roundtrip_empty_projection test
2099+
// Query like: SELECT 1 FROM table
2100+
// where the file scan needs no columns (empty projection)
2101+
// but we project a literal on top
2102+
2103+
// Empty base projection (no columns needed from file)
2104+
let base_projection = ProjectionExprs::new(vec![]);
2105+
2106+
// Top projection with a literal expression: SELECT 1
2107+
let top_projection = ProjectionExprs::new(vec![ProjectionExpr {
2108+
expr: Arc::new(Literal::new(ScalarValue::Int64(Some(1)))),
2109+
alias: "Int64(1)".to_string(),
2110+
}]);
2111+
2112+
// This should succeed - literals don't reference columns so they should
2113+
// pass through unchanged when merged with an empty projection
2114+
let merged = base_projection.try_merge(&top_projection)?;
2115+
assert_snapshot!(format!("{merged}"), @"Projection[1 AS Int64(1)]");
2116+
2117+
Ok(())
2118+
}
2119+
2120+
#[test]
2121+
fn test_update_expr_with_literal() -> Result<()> {
2122+
// Test that update_expr correctly handles expressions without column references
2123+
let literal_expr: Arc<dyn PhysicalExpr> =
2124+
Arc::new(Literal::new(ScalarValue::Int64(Some(42))));
2125+
let empty_projection: Vec<ProjectionExpr> = vec![];
2126+
2127+
// Updating a literal with an empty projection should return the literal unchanged
2128+
let result = update_expr(&literal_expr, &empty_projection, true)?;
2129+
assert!(result.is_some(), "Literal expression should be valid");
2130+
2131+
let result_expr = result.unwrap();
2132+
assert_eq!(
2133+
result_expr
2134+
.as_any()
2135+
.downcast_ref::<Literal>()
2136+
.unwrap()
2137+
.value(),
2138+
&ScalarValue::Int64(Some(42))
2139+
);
2140+
2141+
Ok(())
2142+
}
2143+
2144+
#[test]
2145+
fn test_update_expr_with_complex_literal_expr() -> Result<()> {
2146+
// Test update_expr with an expression containing both literals and a column
2147+
// This tests the case where we have: literal + column
2148+
let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
2149+
Arc::new(Literal::new(ScalarValue::Int64(Some(10)))),
2150+
Operator::Plus,
2151+
Arc::new(Column::new("x", 0)),
2152+
));
2153+
2154+
// Base projection that maps column 0 to a different expression
2155+
let base_projection = vec![ProjectionExpr {
2156+
expr: Arc::new(Column::new("a", 5)),
2157+
alias: "x".to_string(),
2158+
}];
2159+
2160+
// The expression should be updated: 10 + x@0 becomes 10 + a@5
2161+
let result = update_expr(&expr, &base_projection, true)?;
2162+
assert!(result.is_some(), "Expression should be valid");
2163+
2164+
let result_expr = result.unwrap();
2165+
let binary = result_expr
2166+
.as_any()
2167+
.downcast_ref::<BinaryExpr>()
2168+
.expect("Should be a BinaryExpr");
2169+
2170+
// Left side should still be the literal
2171+
assert!(binary.left().as_any().downcast_ref::<Literal>().is_some());
2172+
2173+
// Right side should be updated to reference column at index 5
2174+
let right_col = binary
2175+
.right()
2176+
.as_any()
2177+
.downcast_ref::<Column>()
2178+
.expect("Right should be a Column");
2179+
assert_eq!(right_col.index(), 5);
2180+
2181+
Ok(())
2182+
}
2183+
20122184
#[test]
20132185
fn test_project_schema_simple_columns() -> Result<()> {
20142186
// Input schema: [col0: Int64, col1: Utf8, col2: Float32]

0 commit comments

Comments
 (0)