Skip to content

Commit 435e83f

Browse files
committed
fix
1 parent 6af7b28 commit 435e83f

3 files changed

Lines changed: 133 additions & 91 deletions

File tree

datafusion/datasource-parquet/src/opener.rs

Lines changed: 91 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
2020
use crate::page_filter::PagePruningAccessPlanFilter;
2121
use crate::row_group_filter::RowGroupAccessPlanFilter;
22+
use crate::selectivity::PartitionedFilters;
2223
use crate::{
2324
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
2425
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
@@ -122,7 +123,7 @@ pub(super) struct ParquetOpener {
122123
/// Shared selectivity tracker for adaptive filter reordering.
123124
/// Each opener reads stats and decides which filters to push down.
124125
pub selectivity_tracker:
125-
Arc<std::sync::RwLock<crate::selectivity::SelectivityTracker>>,
126+
Arc<parking_lot::RwLock<crate::selectivity::SelectivityTracker>>,
126127
}
127128

128129
/// Represents a prepared access plan with optional row selection
@@ -439,30 +440,60 @@ impl FileOpener for ParquetOpener {
439440
reader_metadata,
440441
);
441442

442-
let indices = projection.column_indices();
443-
444-
let mask = ProjectionMask::roots(builder.parquet_schema(), indices);
445-
446443
// Filter pushdown: evaluate predicates during scan
447444
// First, partition filters based on selectivity tracking
448445
// filter_metrics will be populated if we successfully build a row filter
449446
let mut filter_metrics: Vec<row_filter::FilterMetrics> = vec![];
450447

451-
let post_scan_filters: Vec<Arc<dyn PhysicalExpr>> = if let Some(predicate) =
452-
pushdown_filters.then_some(predicate.as_ref()).flatten()
453-
{
454-
// Split predicate into conjuncts and partition based on selectivity
455-
let conjuncts: Vec<Arc<dyn PhysicalExpr>> = split_conjunction(predicate)
456-
.into_iter()
457-
.map(Arc::clone)
458-
.collect();
459-
460-
// Acquire the tracker lock for both partitioning and row filter building
461-
let tracker = selectivity_tracker.read().unwrap();
462-
let crate::selectivity::PartitionedFilters {
448+
// Acquire tracker lock once for both partitioning and row filter building.
449+
// We hold it through the projection extension code (which doesn't need it)
450+
// to avoid the deadlock from release-and-reacquire pattern.
451+
let (post_scan_filters, original_projection_len, projection, mask) = {
452+
let tracker = selectivity_tracker.read();
453+
454+
let PartitionedFilters {
463455
row_filters,
464456
post_scan,
465-
} = tracker.partition_filters(conjuncts);
457+
} = if let Some(predicate) =
458+
pushdown_filters.then_some(predicate.as_ref()).flatten()
459+
{
460+
// Split predicate into conjuncts and partition based on selectivity
461+
let conjuncts: Vec<Arc<dyn PhysicalExpr>> =
462+
split_conjunction(predicate)
463+
.into_iter()
464+
.map(Arc::clone)
465+
.collect();
466+
467+
tracker.partition_filters(conjuncts)
468+
} else {
469+
PartitionedFilters {
470+
row_filters: vec![],
471+
post_scan: vec![],
472+
}
473+
};
474+
475+
// Extend projection with post-scan filter expressions BEFORE computing
476+
// column indices, so the mask includes columns needed by filters.
477+
let original_projection_len = projection.as_ref().len();
478+
let projection = if post_scan.is_empty() {
479+
projection
480+
} else {
481+
let mut extended_exprs: Vec<ProjectionExpr> =
482+
projection.iter().cloned().collect();
483+
484+
for (i, filter) in post_scan.iter().enumerate() {
485+
extended_exprs.push(ProjectionExpr {
486+
expr: Arc::clone(filter),
487+
alias: format!("__filter_{i}"),
488+
});
489+
}
490+
491+
ProjectionExprs::new(extended_exprs)
492+
};
493+
494+
// Now compute column indices (includes filter columns)
495+
let indices = projection.column_indices();
496+
let mask = ProjectionMask::roots(builder.parquet_schema(), indices);
466497

467498
// Build row filter with only the high-effectiveness filters
468499
if !row_filters.is_empty() {
@@ -487,13 +518,11 @@ impl FileOpener for ParquetOpener {
487518
};
488519
}
489520

490-
post_scan
491-
} else {
492-
vec![]
493-
};
521+
(post_scan, original_projection_len, projection, mask)
522+
}; // tracker lock released here
494523
if force_filter_selections {
495524
builder =
496-
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
525+
builder.with_row_selection_policy(RowSelectionPolicy::default());
497526
}
498527

499528
// Determine which row groups to actually read. The idea is to skip
@@ -616,29 +645,6 @@ impl FileOpener for ParquetOpener {
616645
// Rebase column indices to match the narrowed stream schema.
617646
// The projection expressions have indices based on physical_file_schema,
618647
// but the stream only contains the columns selected by the ProjectionMask.
619-
//
620-
// Extend projection with post-scan filter expressions (if any).
621-
// These will be evaluated as part of the projection and then
622-
// used to filter the batch before removing the filter columns.
623-
let original_projection_len = projection.as_ref().len();
624-
625-
let projection = if post_scan_filters.is_empty() {
626-
projection
627-
} else {
628-
// Create extended projection with filter expressions
629-
let mut extended_exprs: Vec<ProjectionExpr> =
630-
projection.iter().cloned().collect();
631-
632-
for (i, filter) in post_scan_filters.iter().enumerate() {
633-
extended_exprs.push(ProjectionExpr {
634-
expr: Arc::clone(filter),
635-
alias: format!("__filter_{i}"),
636-
});
637-
}
638-
639-
ProjectionExprs::new(extended_exprs)
640-
};
641-
642648
let projection = projection
643649
.try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?;
644650

@@ -732,7 +738,7 @@ fn apply_post_scan_filters(
732738
batch: RecordBatch,
733739
original_projection_len: usize,
734740
filter_exprs: &[Arc<dyn PhysicalExpr>],
735-
selectivity_tracker: &std::sync::RwLock<crate::selectivity::SelectivityTracker>,
741+
selectivity_tracker: &parking_lot::RwLock<crate::selectivity::SelectivityTracker>,
736742
) -> Result<RecordBatch> {
737743
use arrow::array::{BooleanArray, as_boolean_array};
738744
use arrow::compute::{and, filter_record_batch};
@@ -749,12 +755,11 @@ fn apply_post_scan_filters(
749755
// This gives us accurate marginal selectivity since all filters see the same input.
750756
let input_rows = num_rows as u64;
751757
if input_rows > 0 {
752-
if let Ok(mut tracker) = selectivity_tracker.write() {
753-
for (expr, col) in filter_exprs.iter().zip(filter_columns.iter()) {
754-
let bool_arr = as_boolean_array(col.as_ref());
755-
let rows_matched = bool_arr.true_count() as u64;
756-
tracker.update(expr, rows_matched, input_rows);
757-
}
758+
let mut tracker = selectivity_tracker.write();
759+
for (expr, col) in filter_exprs.iter().zip(filter_columns.iter()) {
760+
let bool_arr = as_boolean_array(col.as_ref());
761+
let rows_matched = bool_arr.true_count() as u64;
762+
tracker.update(expr, rows_matched, input_rows);
758763
}
759764
}
760765

@@ -933,52 +938,59 @@ where
933938
}
934939
}
935940

936-
/// A stream wrapper that updates the [`SelectivityTracker`] when the inner stream completes.
941+
/// A stream wrapper that updates the [`SelectivityTracker`] after each batch.
937942
///
938943
/// This captures per-filter metrics during stream processing and updates the shared
939-
/// selectivity tracker when the stream ends (either normally or due to early termination).
940-
/// The metrics are used to adaptively reorder filters in subsequent file scans.
944+
/// selectivity tracker incrementally after each batch. This allows the system to
945+
/// learn filter effectiveness quickly, potentially promoting effective filters
946+
/// to row filters mid-stream for subsequent files.
941947
struct SelectivityUpdatingStream<S> {
942948
/// The inner stream producing record batches
943949
inner: S,
944950
/// Has the stream finished processing?
945951
done: bool,
946952
/// Per-filter metrics collected during stream processing
947953
filter_metrics: Vec<row_filter::FilterMetrics>,
954+
/// Last reported values for each filter (to compute deltas)
955+
last_reported: Vec<(u64, u64)>, // (matched, total) per filter
948956
/// Shared selectivity tracker to update when stream completes
949-
selectivity_tracker: Arc<std::sync::RwLock<crate::selectivity::SelectivityTracker>>,
957+
selectivity_tracker: Arc<parking_lot::RwLock<crate::selectivity::SelectivityTracker>>,
950958
}
951959

952960
impl<S> SelectivityUpdatingStream<S> {
953961
fn new(
954962
stream: S,
955963
filter_metrics: Vec<row_filter::FilterMetrics>,
956964
selectivity_tracker: Arc<
957-
std::sync::RwLock<crate::selectivity::SelectivityTracker>,
965+
parking_lot::RwLock<crate::selectivity::SelectivityTracker>,
958966
>,
959967
) -> Self {
968+
let last_reported = vec![(0, 0); filter_metrics.len()];
960969
Self {
961970
inner: stream,
962971
done: false,
963972
filter_metrics,
973+
last_reported,
964974
selectivity_tracker,
965975
}
966976
}
967977

968-
/// Update the selectivity tracker with the collected metrics.
969-
/// Called when the stream completes (either normally or due to early termination).
970-
fn update_selectivity(&self) {
971-
// Try to acquire write lock; if we can't, skip the update
972-
// (this is a best-effort optimization, not critical)
973-
if let Ok(mut tracker) = self.selectivity_tracker.write() {
974-
for metrics in &self.filter_metrics {
975-
let matched = metrics.get_rows_matched() as u64;
976-
let total = metrics.get_rows_total() as u64;
977-
978-
// Only update if we actually processed some rows
979-
if total > 0 {
980-
tracker.update(&metrics.expr, matched, total);
981-
}
978+
/// Update the selectivity tracker with metrics accumulated since last update.
979+
/// Uses delta tracking to avoid double-counting rows.
980+
fn update_selectivity(&mut self) {
981+
let mut tracker = self.selectivity_tracker.write();
982+
for (i, metrics) in self.filter_metrics.iter().enumerate() {
983+
let current_matched = metrics.get_rows_matched() as u64;
984+
let current_total = metrics.get_rows_total() as u64;
985+
986+
let (last_matched, last_total) = self.last_reported[i];
987+
let delta_matched = current_matched - last_matched;
988+
let delta_total = current_total - last_total;
989+
990+
// Only update if we have new rows since last update
991+
if delta_total > 0 {
992+
tracker.update(&metrics.expr, delta_matched, delta_total);
993+
self.last_reported[i] = (current_matched, current_total);
982994
}
983995
}
984996
}
@@ -1000,12 +1012,16 @@ where
10001012

10011013
match ready!(self.inner.poll_next_unpin(cx)) {
10021014
None => {
1003-
// Stream completed - update selectivity tracker
1015+
// Stream completed - final update to selectivity tracker
10041016
self.done = true;
10051017
self.update_selectivity();
10061018
Poll::Ready(None)
10071019
}
1008-
Some(result) => Poll::Ready(Some(result)),
1020+
Some(result) => {
1021+
// Update selectivity after each batch for faster learning
1022+
self.update_selectivity();
1023+
Poll::Ready(Some(result))
1024+
}
10091025
}
10101026
}
10111027
}
@@ -1370,7 +1386,7 @@ mod test {
13701386
encryption_factory: None,
13711387
max_predicate_cache_size: self.max_predicate_cache_size,
13721388
reverse_row_groups: self.reverse_row_groups,
1373-
selectivity_tracker: Arc::new(std::sync::RwLock::new(
1389+
selectivity_tracker: Arc::new(parking_lot::RwLock::new(
13741390
crate::selectivity::SelectivityTracker::default(),
13751391
)),
13761392
}

datafusion/datasource-parquet/src/selectivity.rs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,27 @@ pub struct PartitionedFilters {
4848
///
4949
/// This is needed because `Arc<dyn PhysicalExpr>` uses pointer equality by default,
5050
/// but we want to use the structural equality provided by `DynEq` and `DynHash`.
51+
///
52+
/// For dynamic expressions (like `DynamicFilterPhysicalExpr`), we use the snapshot
53+
/// of the expression to ensure stable hash/eq values even as the dynamic expression
54+
/// updates. This is critical for HashMap correctness.
5155
#[derive(Clone, Debug)]
52-
pub struct ExprKey(pub Arc<dyn PhysicalExpr>);
56+
pub struct ExprKey(Arc<dyn PhysicalExpr>);
57+
58+
impl ExprKey {
59+
/// Create a new ExprKey from an expression.
60+
///
61+
/// For dynamic expressions, this takes a snapshot to ensure stable hash/eq.
62+
pub fn new(expr: &Arc<dyn PhysicalExpr>) -> Self {
63+
// Try to get a snapshot; if available, use it for stable hash/eq
64+
let stable_expr = expr
65+
.snapshot()
66+
.ok()
67+
.flatten()
68+
.unwrap_or_else(|| Arc::clone(expr));
69+
Self(stable_expr)
70+
}
71+
}
5372

5473
impl Hash for ExprKey {
5574
fn hash<H: Hasher>(&self, state: &mut H) {
@@ -146,7 +165,7 @@ impl SelectivityTracker {
146165

147166
/// Get the effectiveness for a filter expression, if known.
148167
pub fn get_effectiveness(&self, expr: &Arc<dyn PhysicalExpr>) -> Option<f64> {
149-
let key = ExprKey(Arc::clone(expr));
168+
let key = ExprKey::new(expr);
150169
self.stats.get(&key).map(|s| s.effectiveness())
151170
}
152171

@@ -167,7 +186,7 @@ impl SelectivityTracker {
167186
let mut post_scan = Vec::new();
168187

169188
for filter in filters {
170-
let key = ExprKey(Arc::clone(&filter));
189+
let key = ExprKey::new(&filter);
171190
match self.stats.get(&key) {
172191
Some(stats) if stats.effectiveness() >= self.threshold => {
173192
// Known to be effective - promote to row filter
@@ -188,15 +207,22 @@ impl SelectivityTracker {
188207

189208
/// Update stats for a filter expression after processing a file.
190209
pub fn update(&mut self, expr: &Arc<dyn PhysicalExpr>, matched: u64, total: u64) {
191-
let key = ExprKey(Arc::clone(expr));
210+
let key = ExprKey::new(expr);
192211
self.stats.entry(key).or_default().update(matched, total);
193212
}
194213

195214
/// Get the current stats for a filter expression, if any.
196215
pub fn get_stats(&self, expr: &Arc<dyn PhysicalExpr>) -> Option<&SelectivityStats> {
197-
let key = ExprKey(Arc::clone(expr));
216+
let key = ExprKey::new(expr);
198217
self.stats.get(&key)
199218
}
219+
220+
/// Iterate all known selectivities.
221+
pub fn iter(
222+
&self,
223+
) -> impl Iterator<Item = (&Arc<dyn PhysicalExpr>, &SelectivityStats)> {
224+
self.stats.iter().map(|(key, stats)| (&key.0, stats))
225+
}
200226
}
201227

202228
#[cfg(test)]
@@ -229,9 +255,9 @@ mod tests {
229255
let filter2 = make_filter("a", 5);
230256
let filter3 = make_filter("a", 10);
231257

232-
let key1 = ExprKey(filter1);
233-
let key2 = ExprKey(filter2);
234-
let key3 = ExprKey(filter3);
258+
let key1 = ExprKey::new(&filter1);
259+
let key2 = ExprKey::new(&filter2);
260+
let key3 = ExprKey::new(&filter3);
235261

236262
// Same expression structure should be equal
237263
assert_eq!(key1, key2);
@@ -246,8 +272,8 @@ mod tests {
246272
let filter1 = make_filter("a", 5);
247273
let filter2 = make_filter("a", 5);
248274

249-
let key1 = ExprKey(filter1);
250-
let key2 = ExprKey(filter2);
275+
let key1 = ExprKey::new(&filter1);
276+
let key2 = ExprKey::new(&filter2);
251277

252278
let mut hasher1 = DefaultHasher::new();
253279
let mut hasher2 = DefaultHasher::new();
@@ -336,13 +362,13 @@ mod tests {
336362
assert!(
337363
row_filters
338364
.iter()
339-
.any(|f| ExprKey(f.clone()) == ExprKey(filter1.clone()))
365+
.any(|f| ExprKey::new(f) == ExprKey::new(&filter1))
340366
);
341367
// The unknown filter should be in post_scan
342368
assert!(
343369
post_scan
344370
.iter()
345-
.any(|f| ExprKey(f.clone()) == ExprKey(filter2.clone()))
371+
.any(|f| ExprKey::new(f) == ExprKey::new(&filter2))
346372
);
347373
}
348374

0 commit comments

Comments
 (0)