Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 101 additions & 3 deletions src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,21 @@ fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet<usize>) -> R
.map(Expr::Column)
.collect_vec();

// GUARD: if after pushdown the set of relevant unnest columns is empty,
// avoid constructing an Unnest node with zero exec columns (which will
// later error in Unnest::try_new). Instead, simply project the
// desired output columns from the child plan (after pushing down the child projection).
// Related PR: https://github.com/apache/datafusion/pull/16632, after that we must
// also check for empty exec columns here.
if columns_to_unnest.is_empty() {
return LogicalPlanBuilder::from(pushdown_projection_inexact(
Arc::unwrap_or_clone(unnest.input),
&child_indices,
)?)
.project(columns_to_project)?
.build();
}

LogicalPlanBuilder::from(pushdown_projection_inexact(
Arc::unwrap_or_clone(unnest.input),
&child_indices,
Expand Down Expand Up @@ -922,16 +937,17 @@ mod test {
use std::{any::Any, collections::HashSet, sync::Arc};

use arrow::util::pretty::pretty_format_batches;
use arrow_schema::SchemaRef;
use arrow_schema::{DataType, Field, FieldRef, Fields, SchemaRef};
use datafusion::{
assert_batches_eq, assert_batches_sorted_eq,
catalog::{Session, TableProvider},
datasource::listing::ListingTableUrl,
execution::session_state::SessionStateBuilder,
prelude::{DataFrame, SessionConfig, SessionContext},
};
use datafusion_common::{Column, Result, ScalarValue};
use datafusion_expr::{Expr, JoinType, LogicalPlan, TableType};
use datafusion_common::{Column, DFSchema, Result, ScalarValue};
use datafusion_expr::builder::unnest;
use datafusion_expr::{EmptyRelation, Expr, JoinType, LogicalPlan, TableType};
use datafusion_physical_plan::ExecutionPlan;
use itertools::Itertools;

Expand Down Expand Up @@ -1837,4 +1853,86 @@ mod test {

Ok(())
}

#[test]
fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> {
// This test simulates a simplified MV scenario:
//
// WITH events_structs AS (
// SELECT id, date, unnest(events) AS evs
// FROM base_table
// ),
// flattened_events AS (
// SELECT id, date, evs.event_type, evs.event_time
// FROM events_structs
// ),
// SELECT id, date, MAX(...) ...
// GROUP BY id, date
//
// The partition column is "date". During dependency plan
// building we only request "date" from this subtree,
// so pushdown_projection_inexact receives indices for
// the `date` column only. The guard must kick in:
// unnest(events) becomes unused, and the plan should
// collapse to just projecting `date` from the child.

// 1. Build schema for base table
let id = Field::new("id", DataType::Utf8, true);
let date = Field::new("date", DataType::Utf8, true);

// events: list<struct<event_type, event_time>>
let event_type = Field::new("event_type", DataType::Utf8, true);
let event_time = Field::new("event_time", DataType::Utf8, true);
let events_struct = Field::new(
"item",
DataType::Struct(Fields::from(vec![event_type, event_time])),
true,
);
let events = Field::new(
"events",
DataType::List(FieldRef::from(Box::new(events_struct))),
true,
);

// Build DFSchema: (id, date, events)
let qualified_fields = vec![
(None, Arc::new(id.clone())),
(None, Arc::new(date.clone())),
(None, Arc::new(events.clone())),
];
let df_schema =
DFSchema::new_with_metadata(qualified_fields, std::collections::HashMap::new())?;

// 2. Build a dummy child plan (EmptyRelation with the schema)
let empty = LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(df_schema),
});

// 3. Wrap it with an Unnest node on the "events" column
let events_col = Column::from_name("events");
let unnest_plan = unnest(empty.clone(), vec![events_col.clone()])?;

// 4. Partition column is "date". Look up its actual index dynamically.
let date_idx = unnest_plan
.schema()
.index_of_column(&Column::from_name("date"))?;
let mut indices: HashSet<usize> = HashSet::new();
indices.insert(date_idx);

// 5. Call pushdown_projection_inexact with {date}
let res = pushdown_projection_inexact(unnest_plan, &indices)?;

// 6. Assert the result schema only contains `date`
let cols: Vec<String> = res
.schema()
.fields()
.iter()
.map(|f| f.name().to_string())
.collect();

assert_eq!(cols, vec!["date"]);

Ok(())
}
}