Skip to content

Commit 9fb8eae

Browse files
authored
Dataframe with_column and with_column_renamed performance improvements (#14653)
* POC for with_column improvements. * Updates. Assumptions are still not valid here. * Added flag to indicate whether it is safe to project without validation or not. * Updated documentation for DataFrame.projection_requires_validation field. * project_with_validation is no longer public.
1 parent a28f283 commit 9fb8eae

4 files changed

Lines changed: 91 additions & 16 deletions

File tree

datafusion/core/benches/dataframe.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
5656

5757
data_frame = data_frame
5858
.with_column_renamed(field_name, new_field_name)
59-
.unwrap();
60-
data_frame = data_frame
59+
.unwrap()
6160
.with_column(new_field_name, btrim(vec![col(new_field_name)]))
6261
.unwrap();
6362
}
@@ -68,8 +67,7 @@ fn run(column_count: u32, ctx: Arc<SessionContext>) {
6867
}
6968

7069
fn criterion_benchmark(c: &mut Criterion) {
71-
// 500 takes far too long right now
72-
for column_count in [10, 100, 200 /* 500 */] {
70+
for column_count in [10, 100, 200, 500] {
7371
let ctx = create_context(column_count).unwrap();
7472

7573
c.bench_function(&format!("with_column_{column_count}"), |b| {

datafusion/core/src/dataframe/mod.rs

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,22 @@ pub struct DataFrame {
188188
// Box the (large) SessionState to reduce the size of DataFrame on the stack
189189
session_state: Box<SessionState>,
190190
plan: LogicalPlan,
191+
// Whether projection ops can skip validation or not. This flag if false
192+
// allows for an optimization in `with_column` and `with_column_renamed` functions
193+
// where the recursive work required to columnize and normalize expressions can
194+
// be skipped if set to false. Since these function calls are often chained or
195+
// called many times in dataframe operations this can result in a significant
196+
// performance gain.
197+
//
198+
// The conditions where this can be set to false is when the dataframe function
199+
// call results in the last operation being a
200+
// `LogicalPlanBuilder::from(plan).project(fields)?.build()` or
201+
// `LogicalPlanBuilder::from(plan).project_with_validation(fields)?.build()`
202+
// call. This requirement guarantees that the plan has had all columnization
203+
// and normalization applied to existing expressions and only new expressions
204+
// will require that work. Any operation that update the plan in any way
205+
// via anything other than a `project` call should set this to true.
206+
projection_requires_validation: bool,
191207
}
192208

193209
impl DataFrame {
@@ -200,6 +216,7 @@ impl DataFrame {
200216
Self {
201217
session_state: Box::new(session_state),
202218
plan,
219+
projection_requires_validation: true,
203220
}
204221
}
205222

@@ -337,6 +354,7 @@ impl DataFrame {
337354
Ok(DataFrame {
338355
session_state: self.session_state,
339356
plan: project_plan,
357+
projection_requires_validation: false,
340358
})
341359
}
342360

@@ -442,6 +460,7 @@ impl DataFrame {
442460
Ok(DataFrame {
443461
session_state: self.session_state,
444462
plan,
463+
projection_requires_validation: true,
445464
})
446465
}
447466

@@ -482,6 +501,7 @@ impl DataFrame {
482501
Ok(DataFrame {
483502
session_state: self.session_state,
484503
plan,
504+
projection_requires_validation: true,
485505
})
486506
}
487507

@@ -555,6 +575,7 @@ impl DataFrame {
555575
Ok(DataFrame {
556576
session_state: self.session_state,
557577
plan,
578+
projection_requires_validation: !is_grouping_set,
558579
})
559580
}
560581

@@ -567,6 +588,7 @@ impl DataFrame {
567588
Ok(DataFrame {
568589
session_state: self.session_state,
569590
plan,
591+
projection_requires_validation: true,
570592
})
571593
}
572594

@@ -605,6 +627,7 @@ impl DataFrame {
605627
Ok(DataFrame {
606628
session_state: self.session_state,
607629
plan,
630+
projection_requires_validation: self.projection_requires_validation,
608631
})
609632
}
610633

@@ -642,6 +665,7 @@ impl DataFrame {
642665
Ok(DataFrame {
643666
session_state: self.session_state,
644667
plan,
668+
projection_requires_validation: true,
645669
})
646670
}
647671

@@ -680,6 +704,7 @@ impl DataFrame {
680704
Ok(DataFrame {
681705
session_state: self.session_state,
682706
plan,
707+
projection_requires_validation: true,
683708
})
684709
}
685710

@@ -711,6 +736,7 @@ impl DataFrame {
711736
Ok(DataFrame {
712737
session_state: self.session_state,
713738
plan,
739+
projection_requires_validation: true,
714740
})
715741
}
716742

@@ -752,6 +778,7 @@ impl DataFrame {
752778
Ok(DataFrame {
753779
session_state: self.session_state,
754780
plan,
781+
projection_requires_validation: true,
755782
})
756783
}
757784

@@ -952,6 +979,7 @@ impl DataFrame {
952979
Ok(DataFrame {
953980
session_state: self.session_state,
954981
plan,
982+
projection_requires_validation: self.projection_requires_validation,
955983
})
956984
}
957985

@@ -1001,6 +1029,7 @@ impl DataFrame {
10011029
Ok(DataFrame {
10021030
session_state: self.session_state,
10031031
plan,
1032+
projection_requires_validation: self.projection_requires_validation,
10041033
})
10051034
}
10061035

@@ -1068,6 +1097,7 @@ impl DataFrame {
10681097
Ok(DataFrame {
10691098
session_state: self.session_state,
10701099
plan,
1100+
projection_requires_validation: true,
10711101
})
10721102
}
10731103

@@ -1127,6 +1157,7 @@ impl DataFrame {
11271157
Ok(DataFrame {
11281158
session_state: self.session_state,
11291159
plan,
1160+
projection_requires_validation: true,
11301161
})
11311162
}
11321163

@@ -1162,6 +1193,7 @@ impl DataFrame {
11621193
Ok(DataFrame {
11631194
session_state: self.session_state,
11641195
plan,
1196+
projection_requires_validation: true,
11651197
})
11661198
}
11671199

@@ -1433,6 +1465,7 @@ impl DataFrame {
14331465
Ok(DataFrame {
14341466
session_state: self.session_state,
14351467
plan,
1468+
projection_requires_validation: self.projection_requires_validation,
14361469
})
14371470
}
14381471

@@ -1485,6 +1518,7 @@ impl DataFrame {
14851518
Ok(DataFrame {
14861519
session_state: self.session_state,
14871520
plan,
1521+
projection_requires_validation: true,
14881522
})
14891523
}
14901524

@@ -1520,6 +1554,7 @@ impl DataFrame {
15201554
Ok(DataFrame {
15211555
session_state: self.session_state,
15221556
plan,
1557+
projection_requires_validation: true,
15231558
})
15241559
}
15251560

@@ -1565,6 +1600,7 @@ impl DataFrame {
15651600
DataFrame {
15661601
session_state: self.session_state,
15671602
plan,
1603+
projection_requires_validation: self.projection_requires_validation,
15681604
}
15691605
.collect()
15701606
.await
@@ -1634,6 +1670,7 @@ impl DataFrame {
16341670
DataFrame {
16351671
session_state: self.session_state,
16361672
plan,
1673+
projection_requires_validation: self.projection_requires_validation,
16371674
}
16381675
.collect()
16391676
.await
@@ -1703,12 +1740,13 @@ impl DataFrame {
17031740
DataFrame {
17041741
session_state: self.session_state,
17051742
plan,
1743+
projection_requires_validation: self.projection_requires_validation,
17061744
}
17071745
.collect()
17081746
.await
17091747
}
17101748

1711-
/// Add an additional column to the DataFrame.
1749+
/// Add or replace a column in the DataFrame.
17121750
///
17131751
/// # Example
17141752
/// ```
@@ -1736,33 +1774,36 @@ impl DataFrame {
17361774

17371775
let mut col_exists = false;
17381776
let new_column = expr.alias(name);
1739-
let mut fields: Vec<Expr> = plan
1777+
let mut fields: Vec<(Expr, bool)> = plan
17401778
.schema()
17411779
.iter()
17421780
.filter_map(|(qualifier, field)| {
17431781
if field.name() == name {
17441782
col_exists = true;
1745-
Some(new_column.clone())
1783+
Some((new_column.clone(), true))
17461784
} else {
17471785
let e = col(Column::from((qualifier, field)));
17481786
window_fn_str
17491787
.as_ref()
17501788
.filter(|s| *s == &e.to_string())
17511789
.is_none()
1752-
.then_some(e)
1790+
.then_some((e, self.projection_requires_validation))
17531791
}
17541792
})
17551793
.collect();
17561794

17571795
if !col_exists {
1758-
fields.push(new_column);
1796+
fields.push((new_column, true));
17591797
}
17601798

1761-
let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?;
1799+
let project_plan = LogicalPlanBuilder::from(plan)
1800+
.project_with_validation(fields)?
1801+
.build()?;
17621802

17631803
Ok(DataFrame {
17641804
session_state: self.session_state,
17651805
plan: project_plan,
1806+
projection_requires_validation: false,
17661807
})
17671808
}
17681809

@@ -1819,19 +1860,23 @@ impl DataFrame {
18191860
.iter()
18201861
.map(|(qualifier, field)| {
18211862
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
1822-
col(Column::from((qualifier, field)))
1823-
.alias_qualified(qualifier.cloned(), new_name)
1863+
(
1864+
col(Column::from((qualifier, field)))
1865+
.alias_qualified(qualifier.cloned(), new_name),
1866+
false,
1867+
)
18241868
} else {
1825-
col(Column::from((qualifier, field)))
1869+
(col(Column::from((qualifier, field))), false)
18261870
}
18271871
})
18281872
.collect::<Vec<_>>();
18291873
let project_plan = LogicalPlanBuilder::from(self.plan)
1830-
.project(projection)?
1874+
.project_with_validation(projection)?
18311875
.build()?;
18321876
Ok(DataFrame {
18331877
session_state: self.session_state,
18341878
plan: project_plan,
1879+
projection_requires_validation: false,
18351880
})
18361881
}
18371882

@@ -1897,6 +1942,7 @@ impl DataFrame {
18971942
Ok(DataFrame {
18981943
session_state: self.session_state,
18991944
plan,
1945+
projection_requires_validation: self.projection_requires_validation,
19001946
})
19011947
}
19021948

@@ -1932,6 +1978,7 @@ impl DataFrame {
19321978
Ok(DataFrame {
19331979
session_state: self.session_state,
19341980
plan,
1981+
projection_requires_validation: self.projection_requires_validation,
19351982
})
19361983
}
19371984

datafusion/core/src/dataframe/parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ impl DataFrame {
9393
DataFrame {
9494
session_state: self.session_state,
9595
plan,
96+
projection_requires_validation: self.projection_requires_validation,
9697
}
9798
.collect()
9899
.await

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,15 @@ impl LogicalPlanBuilder {
528528
project(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
529529
}
530530

531+
/// Apply a projection without alias with optional validation
532+
/// (true to validate, false to not validate)
533+
pub fn project_with_validation(
534+
self,
535+
expr: Vec<(impl Into<Expr>, bool)>,
536+
) -> Result<Self> {
537+
project_with_validation(Arc::unwrap_or_clone(self.plan), expr).map(Self::new)
538+
}
539+
531540
/// Select the given column indices
532541
pub fn select(self, indices: impl IntoIterator<Item = usize>) -> Result<Self> {
533542
let exprs: Vec<_> = indices
@@ -1647,13 +1656,33 @@ pub fn union_by_name(
16471656
pub fn project(
16481657
plan: LogicalPlan,
16491658
expr: impl IntoIterator<Item = impl Into<Expr>>,
1659+
) -> Result<LogicalPlan> {
1660+
project_with_validation(plan, expr.into_iter().map(|e| (e, true)))
1661+
}
1662+
1663+
/// Create Projection. Similar to project except that the expressions
1664+
/// passed in have a flag to indicate if that expression requires
1665+
/// validation (normalize & columnize) (true) or not (false)
1666+
/// # Errors
1667+
/// This function errors under any of the following conditions:
1668+
/// * Two or more expressions have the same name
1669+
/// * An invalid expression is used (e.g. a `sort` expression)
1670+
fn project_with_validation(
1671+
plan: LogicalPlan,
1672+
expr: impl IntoIterator<Item = (impl Into<Expr>, bool)>,
16501673
) -> Result<LogicalPlan> {
16511674
let mut projected_expr = vec![];
1652-
for e in expr {
1675+
for (e, validate) in expr {
16531676
let e = e.into();
16541677
match e {
16551678
Expr::Wildcard { .. } => projected_expr.push(e),
1656-
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
1679+
_ => {
1680+
if validate {
1681+
projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?)
1682+
} else {
1683+
projected_expr.push(e)
1684+
}
1685+
}
16571686
}
16581687
}
16591688
validate_unique_names("Projections", projected_expr.iter())?;

0 commit comments

Comments
 (0)