diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 4dfb6a4ec3d33..695252803bae7 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -598,8 +598,39 @@ impl DataSource for FileScanConfig { SchedulingType::Cooperative } - fn statistics(&self) -> Result { - Ok(self.projected_stats()) + fn partition_statistics(&self, partition: Option) -> Result { + if let Some(partition) = partition { + // Get statistics for a specific partition + if let Some(file_group) = self.file_groups.get(partition) { + if let Some(stat) = file_group.file_statistics(None) { + // Project the statistics based on the projection + let table_cols_stats = self + .projection_indices() + .into_iter() + .map(|idx| { + if idx < self.file_schema().fields().len() { + stat.column_statistics[idx].clone() + } else { + // TODO provide accurate stat for partition column + // See https://github.com/apache/datafusion/issues/1186 + ColumnStatistics::new_unknown() + } + }) + .collect(); + + return Ok(Statistics { + num_rows: stat.num_rows, + total_byte_size: stat.total_byte_size, + column_statistics: table_cols_stats, + }); + } + } + // If no statistics available for this partition, return unknown + Ok(Statistics::new_unknown(&self.projected_schema())) + } else { + // Return aggregate statistics across all partitions + Ok(self.projected_stats()) + } } fn with_fetch(&self, limit: Option) -> Option> { @@ -1603,7 +1634,7 @@ mod tests { ); let source_statistics = conf.file_source.statistics().unwrap(); - let conf_stats = conf.statistics().unwrap(); + let conf_stats = conf.partition_statistics(None).unwrap(); // projection should be reflected in the file source statistics assert_eq!(conf_stats.num_rows, Precision::Inexact(3)); @@ -2510,4 +2541,91 @@ mod tests { Ok(()) } + + #[test] + fn test_partition_statistics_projection() { + // This test verifies that partition_statistics applies projection correctly. + // The old implementation had a bug where it returned file group statistics + // without applying the projection, returning all column statistics instead + // of just the projected ones. + + use crate::source::DataSourceExec; + use datafusion_physical_plan::ExecutionPlan; + + // Create a schema with 4 columns + let schema = Arc::new(Schema::new(vec![ + Field::new("col0", DataType::Int32, false), + Field::new("col1", DataType::Int32, false), + Field::new("col2", DataType::Int32, false), + Field::new("col3", DataType::Int32, false), + ])); + + // Create statistics for all 4 columns + let file_group_stats = Statistics { + num_rows: Precision::Exact(100), + total_byte_size: Precision::Exact(1024), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + ..ColumnStatistics::new_unknown() + }, + ColumnStatistics { + null_count: Precision::Exact(5), + ..ColumnStatistics::new_unknown() + }, + ColumnStatistics { + null_count: Precision::Exact(10), + ..ColumnStatistics::new_unknown() + }, + ColumnStatistics { + null_count: Precision::Exact(15), + ..ColumnStatistics::new_unknown() + }, + ], + }; + + // Create a file group with statistics + let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)]) + .with_statistics(Arc::new(file_group_stats)); + + // Create a FileScanConfig with projection: only keep columns 0 and 2 + let config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test:///").unwrap(), + Arc::clone(&schema), + Arc::new(MockSource::default()), + ) + .with_projection(Some(vec![0, 2])) // Only project columns 0 and 2 + .with_file_groups(vec![file_group]) + .build(); + + // Create a DataSourceExec from the config + let exec = DataSourceExec::from_data_source(config); + + // Get statistics for partition 0 + let partition_stats = exec.partition_statistics(Some(0)).unwrap(); + + // Verify that only 2 columns are in the statistics (the projected ones) + assert_eq!( + partition_stats.column_statistics.len(), + 2, + "Expected 2 column statistics (projected), but got {}", + partition_stats.column_statistics.len() + ); + + // Verify the column statistics are for columns 0 and 2 + assert_eq!( + partition_stats.column_statistics[0].null_count, + Precision::Exact(0), + "First projected column should be col0 with 0 nulls" + ); + assert_eq!( + partition_stats.column_statistics[1].null_count, + Precision::Exact(10), + "Second projected column should be col2 with 10 nulls" + ); + + // Verify row count and byte size are preserved + assert_eq!(partition_stats.num_rows, Precision::Exact(100)); + assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024)); + } } diff --git a/datafusion/datasource/src/memory.rs b/datafusion/datasource/src/memory.rs index eb55aa9b0b0d2..7d5c8c4834ead 100644 --- a/datafusion/datasource/src/memory.rs +++ b/datafusion/datasource/src/memory.rs @@ -21,6 +21,7 @@ use std::collections::BinaryHeap; use std::fmt; use std::fmt::Debug; use std::ops::Deref; +use std::slice::from_ref; use std::sync::Arc; use crate::sink::DataSink; @@ -192,12 +193,27 @@ impl DataSource for MemorySourceConfig { SchedulingType::Cooperative } - fn statistics(&self) -> Result { - Ok(common::compute_record_batch_statistics( - &self.partitions, - &self.schema, - self.projection.clone(), - )) + fn partition_statistics(&self, partition: Option) -> Result { + if let Some(partition) = partition { + // Compute statistics for a specific partition + if let Some(batches) = self.partitions.get(partition) { + Ok(common::compute_record_batch_statistics( + from_ref(batches), + &self.schema, + self.projection.clone(), + )) + } else { + // Invalid partition index + Ok(Statistics::new_unknown(&self.projected_schema)) + } + } else { + // Compute statistics across all partitions + Ok(common::compute_record_batch_statistics( + &self.partitions, + &self.schema, + self.projection.clone(), + )) + } } fn with_fetch(&self, limit: Option) -> Option> { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 20d9a1d6e53f0..11a8a3867b809 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -151,7 +151,21 @@ pub trait DataSource: Send + Sync + Debug { fn scheduling_type(&self) -> SchedulingType { SchedulingType::NonCooperative } - fn statistics(&self) -> Result; + + /// Returns statistics for a specific partition, or aggregate statistics + /// across all partitions if `partition` is `None`. + fn partition_statistics(&self, partition: Option) -> Result; + + /// Returns aggregate statistics across all partitions. + /// + /// # Deprecated + /// Use [`Self::partition_statistics`] instead, which provides more fine-grained + /// control over statistics retrieval (per-partition or aggregate). + #[deprecated(since = "51.0.0", note = "Use partition_statistics instead")] + fn statistics(&self) -> Result { + self.partition_statistics(None) + } + /// Return a copy of this DataSource with a new fetch limit fn with_fetch(&self, _limit: Option) -> Option>; fn fetch(&self) -> Option; @@ -285,21 +299,7 @@ impl ExecutionPlan for DataSourceExec { } fn partition_statistics(&self, partition: Option) -> Result { - if let Some(partition) = partition { - let mut statistics = Statistics::new_unknown(&self.schema()); - if let Some(file_config) = - self.data_source.as_any().downcast_ref::() - { - if let Some(file_group) = file_config.file_groups.get(partition) { - if let Some(stat) = file_group.file_statistics(None) { - statistics = stat.clone(); - } - } - } - Ok(statistics) - } else { - Ok(self.data_source.statistics()?) - } + self.data_source.partition_statistics(partition) } fn with_fetch(&self, limit: Option) -> Option> {