Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 121 additions & 3 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,39 @@ impl DataSource for FileScanConfig {
SchedulingType::Cooperative
}

fn statistics(&self) -> Result<Statistics> {
Ok(self.projected_stats())
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if let Some(partition) = partition {
// Get statistics for a specific partition
if let Some(file_group) = self.file_groups.get(partition) {
if let Some(stat) = file_group.file_statistics(None) {
// Project the statistics based on the projection
let table_cols_stats = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema().fields().len() {
stat.column_statistics[idx].clone()
} else {
// TODO provide accurate stat for partition column
// See https://github.com/apache/datafusion/issues/1186
ColumnStatistics::new_unknown()
}
})
.collect();

return Ok(Statistics {
num_rows: stat.num_rows,
total_byte_size: stat.total_byte_size,
column_statistics: table_cols_stats,
});
}
}
// If no statistics available for this partition, return unknown
Ok(Statistics::new_unknown(&self.projected_schema()))
} else {
// Return aggregate statistics across all partitions
Ok(self.projected_stats())
}
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
Expand Down Expand Up @@ -1603,7 +1634,7 @@ mod tests {
);

let source_statistics = conf.file_source.statistics().unwrap();
let conf_stats = conf.statistics().unwrap();
let conf_stats = conf.partition_statistics(None).unwrap();

// projection should be reflected in the file source statistics
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
Expand Down Expand Up @@ -2510,4 +2541,91 @@ mod tests {

Ok(())
}

#[test]
fn test_partition_statistics_projection() {
Comment on lines +2545 to +2546
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test demonstrates the bug and fails on main

// This test verifies that partition_statistics applies projection correctly.
// The old implementation had a bug where it returned file group statistics
// without applying the projection, returning all column statistics instead
// of just the projected ones.

use crate::source::DataSourceExec;
use datafusion_physical_plan::ExecutionPlan;

// Create a schema with 4 columns
let schema = Arc::new(Schema::new(vec![
Field::new("col0", DataType::Int32, false),
Field::new("col1", DataType::Int32, false),
Field::new("col2", DataType::Int32, false),
Field::new("col3", DataType::Int32, false),
]));

// Create statistics for all 4 columns
let file_group_stats = Statistics {
num_rows: Precision::Exact(100),
total_byte_size: Precision::Exact(1024),
column_statistics: vec![
ColumnStatistics {
null_count: Precision::Exact(0),
..ColumnStatistics::new_unknown()
},
ColumnStatistics {
null_count: Precision::Exact(5),
..ColumnStatistics::new_unknown()
},
ColumnStatistics {
null_count: Precision::Exact(10),
..ColumnStatistics::new_unknown()
},
ColumnStatistics {
null_count: Precision::Exact(15),
..ColumnStatistics::new_unknown()
},
],
};

// Create a file group with statistics
let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
.with_statistics(Arc::new(file_group_stats));

// Create a FileScanConfig with projection: only keep columns 0 and 2
let config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("test:///").unwrap(),
Arc::clone(&schema),
Arc::new(MockSource::default()),
)
.with_projection(Some(vec![0, 2])) // Only project columns 0 and 2
.with_file_groups(vec![file_group])
.build();

// Create a DataSourceExec from the config
let exec = DataSourceExec::from_data_source(config);

// Get statistics for partition 0
let partition_stats = exec.partition_statistics(Some(0)).unwrap();

// Verify that only 2 columns are in the statistics (the projected ones)
assert_eq!(
partition_stats.column_statistics.len(),
2,
"Expected 2 column statistics (projected), but got {}",
partition_stats.column_statistics.len()
);

// Verify the column statistics are for columns 0 and 2
assert_eq!(
partition_stats.column_statistics[0].null_count,
Precision::Exact(0),
"First projected column should be col0 with 0 nulls"
);
assert_eq!(
partition_stats.column_statistics[1].null_count,
Precision::Exact(10),
"Second projected column should be col2 with 10 nulls"
);

// Verify row count and byte size are preserved
assert_eq!(partition_stats.num_rows, Precision::Exact(100));
assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024));
}
}
28 changes: 22 additions & 6 deletions datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::BinaryHeap;
use std::fmt;
use std::fmt::Debug;
use std::ops::Deref;
use std::slice::from_ref;
use std::sync::Arc;

use crate::sink::DataSink;
Expand Down Expand Up @@ -192,12 +193,27 @@ impl DataSource for MemorySourceConfig {
SchedulingType::Cooperative
}

fn statistics(&self) -> Result<Statistics> {
Ok(common::compute_record_batch_statistics(
&self.partitions,
&self.schema,
self.projection.clone(),
))
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if let Some(partition) = partition {
// Compute statistics for a specific partition
if let Some(batches) = self.partitions.get(partition) {
Ok(common::compute_record_batch_statistics(
from_ref(batches),
&self.schema,
self.projection.clone(),
))
} else {
// Invalid partition index
Ok(Statistics::new_unknown(&self.projected_schema))
}
} else {
// Compute statistics across all partitions
Ok(common::compute_record_batch_statistics(
&self.partitions,
&self.schema,
self.projection.clone(),
))
}
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
Expand Down
32 changes: 16 additions & 16 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,21 @@ pub trait DataSource: Send + Sync + Debug {
fn scheduling_type(&self) -> SchedulingType {
SchedulingType::NonCooperative
}
fn statistics(&self) -> Result<Statistics>;

/// Returns statistics for a specific partition, or aggregate statistics
/// across all partitions if `partition` is `None`.
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an API changing, I think we should make statistics as deprecated, and add a new partition_statistics API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I added fn statistics() back to DataSource trait as a deprecated method and made it default to partition_statistics(None). This should make it fully backwards compatible for all implementations.


/// Returns aggregate statistics across all partitions.
///
/// # Deprecated
/// Use [`Self::partition_statistics`] instead, which provides more fine-grained
/// control over statistics retrieval (per-partition or aggregate).
#[deprecated(since = "51.0.0", note = "Use partition_statistics instead")]
fn statistics(&self) -> Result<Statistics> {
self.partition_statistics(None)
}

/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
Expand Down Expand Up @@ -285,21 +299,7 @@ impl ExecutionPlan for DataSourceExec {
}

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
if let Some(partition) = partition {
let mut statistics = Statistics::new_unknown(&self.schema());
if let Some(file_config) =
self.data_source.as_any().downcast_ref::<FileScanConfig>()
{
if let Some(file_group) = file_config.file_groups.get(partition) {
if let Some(stat) = file_group.file_statistics(None) {
statistics = stat.clone();
}
}
}
Ok(statistics)
} else {
Ok(self.data_source.statistics()?)
}
Comment on lines -288 to -302
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was clearly buggy: in the partition = None path it returns projected statistics (see current implementation in FileScanConfig::statistics), if it took the Some(partition) path it calculated the statistics but then never projected them.

self.data_source.partition_statistics(partition)
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
Expand Down