Skip to content
6 changes: 2 additions & 4 deletions datafusion/core/benches/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {

data_frame = data_frame
.with_column_renamed(field_name, new_field_name)
.unwrap();
data_frame = data_frame
.unwrap()
.with_column(new_field_name, btrim(vec![col(new_field_name)]))
.unwrap();
}
Expand All @@ -68,8 +67,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
}

fn criterion_benchmark(c: &mut Criterion) {
// 500 takes far too long right now
for column_count in [10, 100, 200 /* 500 */] {
for column_count in [10, 100, 200, 500] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

let ctx = create_context(column_count).unwrap();

c.bench_function(&format!("with_column_{column_count}"), |b| {
Expand Down
53 changes: 43 additions & 10 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ pub struct DataFrame {
// Box the (large) SessionState to reduce the size of DataFrame on the stack
session_state: Box<SessionState>,
plan: LogicalPlan,
// whether we can skip validation for projection ops
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some additional comments here about what circumstances permit validation to be skipped?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, please review text when you have a chance.

projection_requires_validation: bool,
}

impl DataFrame {
Expand All @@ -195,6 +197,7 @@ impl DataFrame {
Self {
session_state: Box::new(session_state),
plan,
projection_requires_validation: true,
}
}

Expand Down Expand Up @@ -332,6 +335,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
projection_requires_validation: false,
})
}

Expand Down Expand Up @@ -437,6 +441,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -477,6 +482,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -547,6 +553,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: !is_grouping_set,
})
}

Expand All @@ -559,6 +566,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -597,6 +605,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -634,6 +643,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -672,6 +682,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -703,6 +714,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -744,6 +756,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -944,6 +957,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -993,6 +1007,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -1060,6 +1075,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1119,6 +1135,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1154,6 +1171,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1425,6 +1443,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -1477,6 +1496,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1512,6 +1532,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: true,
})
}

Expand Down Expand Up @@ -1557,6 +1578,7 @@ impl DataFrame {
DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
}
.collect()
.await
Expand Down Expand Up @@ -1626,6 +1648,7 @@ impl DataFrame {
DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
}
.collect()
.await
Expand Down Expand Up @@ -1695,12 +1718,13 @@ impl DataFrame {
DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
}
.collect()
.await
}

/// Add an additional column to the DataFrame.
/// Add or replace a column in the DataFrame.
///
/// # Example
/// ```
Expand Down Expand Up @@ -1728,33 +1752,36 @@ impl DataFrame {

let mut col_exists = false;
let new_column = expr.alias(name);
let mut fields: Vec<Expr> = plan
let mut fields: Vec<(Expr, bool)> = plan
.schema()
.iter()
.filter_map(|(qualifier, field)| {
if field.name() == name {
col_exists = true;
Some(new_column.clone())
Some((new_column.clone(), true))
} else {
let e = col(Column::from((qualifier, field)));
window_fn_str
.as_ref()
.filter(|s| *s == &e.to_string())
.is_none()
.then_some(e)
.then_some((e, self.projection_requires_validation))
}
})
.collect();

if !col_exists {
fields.push(new_column);
fields.push((new_column, true));
}

let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?;
let project_plan = LogicalPlanBuilder::from(plan)
.project_with_validation(fields)?
.build()?;

Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
projection_requires_validation: false,
})
}

Expand Down Expand Up @@ -1811,19 +1838,23 @@ impl DataFrame {
.iter()
.map(|(qualifier, field)| {
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name)
(
col(Column::from((qualifier, field)))
.alias_qualified(qualifier.cloned(), new_name),
false,
)
} else {
col(Column::from((qualifier, field)))
(col(Column::from((qualifier, field))), false)
}
})
.collect::<Vec<_>>();
let project_plan = LogicalPlanBuilder::from(self.plan)
.project(projection)?
.project_with_validation(projection)?
.build()?;
Ok(DataFrame {
session_state: self.session_state,
plan: project_plan,
projection_requires_validation: false,
})
}

Expand Down Expand Up @@ -1889,6 +1920,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}

Expand Down Expand Up @@ -1924,6 +1956,7 @@ impl DataFrame {
Ok(DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl DataFrame {
DataFrame {
session_state: self.session_state,
plan,
projection_requires_validation: self.projection_requires_validation,
}
.collect()
.await
Expand Down
33 changes: 31 additions & 2 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,15 @@ impl LogicalPlanBuilder {
project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
}

/// Apply a projection without alias with optional validation
/// (true to validate, false to not validate)
pub fn project_with_validation(
self,
expr: Vec<(impl Into<Expr>, bool)>,
) -> Result<Self> {
project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
}

/// Select the given column indices
pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
let exprs: Vec<_> = indices
Expand Down Expand Up @@ -1612,13 +1621,33 @@ pub fn union_by_name(
pub fn project(
plan: LogicalPlan,
expr: impl IntoIterator<Item = impl Into<Expr>>,
) -> Result<LogicalPlan> {
project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
}

/// Create Projection. Similar to project except that the expressions
/// passed in have a flag to indicate if that expression requires
/// validation (normalize & columnize) (true) or not (false)
/// # Errors
/// This function errors under any of the following conditions:
/// * Two or more expressions have the same name
/// * An invalid expression is used (e.g. a `sort` expression)
pub fn project_with_validation(
plan: LogicalPlan,
expr: impl IntoIterator<Item = (impl Into<Expr>, bool)>,
) -> Result<LogicalPlan> {
let mut projected_expr = vec![];
for e in expr {
for (e, validate) in expr {
let e = e.into();
match e {
Expr::Wildcard { .. } => projected_expr.push(e),
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
_ => {
if validate {
projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
} else {
projected_expr.push(e)
}
}
}
}
validate_unique_names("Projections", projected_expr.iter())?;
Expand Down
Loading