diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index a9834da92e5a4..cbff518946955 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -712,7 +712,7 @@ impl ListingOptions { /// # Ok(()) /// # } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ListingTable { table_paths: Vec, /// `file_schema` contains only the columns physically stored in the data files themselves. diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index f0a1f94d87e1f..5a237c29372f2 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -42,6 +42,7 @@ use datafusion_physical_plan::{ }; use datafusion_datasource::file_groups::FileGroup; +use datafusion_physical_plan::statistics::PartitionedStatistics; use futures::StreamExt; use itertools::Itertools; use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore}; @@ -190,6 +191,11 @@ impl ExecutionPlan for ArrowExec { fn statistics(&self) -> Result { self.inner.statistics() } + + fn statistics_by_partition(&self) -> Result { + self.inner.statistics_by_partition() + } + fn fetch(&self) -> Option { self.inner.fetch() } diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 0000000000000..ec164c6df7b5e Binary files /dev/null and b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet differ diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 0000000000000..4b78cf963c111 Binary files /dev/null and b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet differ diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 0000000000000..09a01771d503c Binary files /dev/null and b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet differ diff --git a/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet new file mode 100644 index 0000000000000..6398cc43a2f5d Binary files /dev/null and b/datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet differ diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index 6643e7fd59b7a..c115a5253adcc 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -24,6 +24,7 @@ mod enforce_sorting; mod join_selection; mod limit_pushdown; mod limited_distinct_aggregation; +mod partition_statistics; mod projection_pushdown; mod push_down_filter; mod replace_with_order_preserving_variants; diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs new file mode 100644 index 0000000000000..22f1404b94f16 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -0,0 +1,460 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#[cfg(test)] +mod test { + use arrow::array::{Int32Array, RecordBatch}; + use arrow_schema::{DataType, Field, Schema, SortOptions}; + use datafusion::datasource::listing::ListingTable; + use datafusion::prelude::SessionContext; + use datafusion_catalog::TableProvider; + use datafusion_common::stats::Precision; + use datafusion_common::Result; + use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; + use datafusion_execution::config::SessionConfig; + use datafusion_execution::TaskContext; + use datafusion_expr_common::operator::Operator; + use datafusion_physical_expr::expressions::{binary, lit, Column}; + use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec; + use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec; + use datafusion_physical_plan::filter::FilterExec; + use datafusion_physical_plan::joins::CrossJoinExec; + use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; + use datafusion_physical_plan::projection::ProjectionExec; + use datafusion_physical_plan::sorts::sort::SortExec; + use datafusion_physical_plan::union::UnionExec; + use datafusion_physical_plan::{execute_stream_partitioned, ExecutionPlan}; + use futures::TryStreamExt; + use std::sync::Arc; + + /// Creates a test table with statistics from the test data directory. + /// + /// This function: + /// - Creates an external table from './tests/data/test_statistics_per_partition' + /// - If we set the `target_partition` to 2, the data contains 2 partitions, each with 2 rows + /// - Each partition has an "id" column (INT) with the following values: + /// - First partition: [3, 4] + /// - Second partition: [1, 2] + /// - Each row is 110 bytes in size + /// + /// @param target_partition Optional parameter to set the target partitions + /// @return ExecutionPlan representing the scan of the table with statistics + async fn create_scan_exec_with_statistics( + create_table_sql: Option<&str>, + target_partition: Option, + ) -> Arc { + let mut session_config = SessionConfig::new().with_collect_statistics(true); + if let Some(partition) = target_partition { + session_config = session_config.with_target_partitions(partition); + } + let ctx = SessionContext::new_with_config(session_config); + // Create table with partition + let create_table_sql = create_table_sql.unwrap_or( + "CREATE EXTERNAL TABLE t1 (id INT NOT NULL, date DATE) \ + STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\ + PARTITIONED BY (date) \ + WITH ORDER (id ASC);", + ); + // Get table name from `create_table_sql` + let table_name = create_table_sql + .split_whitespace() + .nth(3) + .unwrap_or("t1") + .to_string(); + ctx.sql(create_table_sql) + .await + .unwrap() + .collect() + .await + .unwrap(); + let table = ctx.table_provider(table_name.as_str()).await.unwrap(); + let listing_table = table + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + listing_table + .scan(&ctx.state(), None, &[], None) + .await + .unwrap() + } + + /// Helper function to create expected statistics for a partition with Int32 column + fn create_partition_statistics( + num_rows: usize, + total_byte_size: usize, + min_value: i32, + max_value: i32, + include_date_column: bool, + ) -> Statistics { + let mut column_stats = vec![ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(max_value))), + min_value: Precision::Exact(ScalarValue::Int32(Some(min_value))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }]; + + if include_date_column { + column_stats.push(ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Absent, + min_value: Precision::Absent, + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }); + } + + Statistics { + num_rows: Precision::Exact(num_rows), + total_byte_size: Precision::Exact(total_byte_size), + column_statistics: column_stats, + } + } + + /// Helper function to validate that statistics from statistics_by_partition match the actual data + async fn validate_statistics_with_data( + plan: Arc, + expected_stats: Vec<(i32, i32, usize)>, // (min_id, max_id, row_count) + id_column_index: usize, + ) -> Result<()> { + let ctx = TaskContext::default(); + let partitions = execute_stream_partitioned(plan, Arc::new(ctx))?; + + let mut actual_stats = Vec::new(); + for partition_stream in partitions.into_iter() { + let result: Vec = partition_stream.try_collect().await?; + + let mut min_id = i32::MAX; + let mut max_id = i32::MIN; + let mut row_count = 0; + + for batch in result { + if batch.num_columns() > id_column_index { + let id_array = batch + .column(id_column_index) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + let id_value = id_array.value(i); + min_id = min_id.min(id_value); + max_id = max_id.max(id_value); + row_count += 1; + } + } + } + + if row_count > 0 { + actual_stats.push((min_id, max_id, row_count)); + } + } + + // Compare actual data with expected statistics + assert_eq!( + actual_stats.len(), + expected_stats.len(), + "Number of partitions with data doesn't match expected" + ); + for i in 0..actual_stats.len() { + assert_eq!( + actual_stats[i], expected_stats[i], + "Partition {} data doesn't match statistics", + i + ); + } + + Ok(()) + } + + #[tokio::test] + async fn test_statistics_by_partition_of_data_source() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let statistics = scan.statistics_by_partition()?; + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); + // Check the statistics of each partition + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![ + (3, 4, 2), // (min_id, max_id, row_count) for first partition + (1, 2, 2), // (min_id, max_id, row_count) for second partition + ]; + validate_statistics_with_data(scan, expected_stats, 0).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_statistics_by_partition_of_projection() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + // Add projection execution plan + let exprs: Vec<(Arc, String)> = + vec![(Arc::new(Column::new("id", 0)), "id".to_string())]; + let projection = ProjectionExec::try_new(exprs, scan)?; + let statistics = projection.statistics_by_partition()?; + let expected_statistic_partition_1 = + create_partition_statistics(2, 8, 3, 4, false); + let expected_statistic_partition_2 = + create_partition_statistics(2, 8, 1, 2, false); + // Check the statistics of each partition + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(Arc::new(projection), expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistics_by_partition_of_sort() -> Result<()> { + let scan_1 = create_scan_exec_with_statistics(None, Some(1)).await; + // Add sort execution plan + let sort = SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("id", 0)), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]), + scan_1, + ); + let sort_exec = Arc::new(sort.clone()); + let statistics = sort_exec.statistics_by_partition()?; + let expected_statistic_partition = + create_partition_statistics(4, 220, 1, 4, true); + assert_eq!(statistics.len(), 1); + assert_eq!(statistics[0], expected_statistic_partition); + // Check the statistics_by_partition with real results + let expected_stats = vec![(1, 4, 4)]; + validate_statistics_with_data(sort_exec.clone(), expected_stats, 0).await?; + + // Sort with preserve_partitioning + let scan_2 = create_scan_exec_with_statistics(None, Some(2)).await; + // Add sort execution plan + let sort_exec = Arc::new( + SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("id", 0)), + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]), + scan_2, + ) + .with_preserve_partitioning(true), + ); + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); + let statistics = sort_exec.statistics_by_partition()?; + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(sort_exec, expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistics_by_partition_of_filter() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); + let predicate = binary( + Arc::new(Column::new("id", 0)), + Operator::Lt, + lit(1i32), + &schema, + )?; + let filter: Arc = + Arc::new(FilterExec::try_new(predicate, scan)?); + let full_statistics = filter.statistics()?; + let expected_full_statistic = Statistics { + num_rows: Precision::Inexact(0), + total_byte_size: Precision::Inexact(0), + column_statistics: vec![ + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Null), + sum_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + }, + ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Null), + min_value: Precision::Exact(ScalarValue::Null), + sum_value: Precision::Exact(ScalarValue::Null), + distinct_count: Precision::Exact(0), + }, + ], + }; + assert_eq!(full_statistics, expected_full_statistic); + + let statistics = filter.statistics_by_partition()?; + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_full_statistic); + assert_eq!(statistics[1], expected_full_statistic); + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_union() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let union_exec = Arc::new(UnionExec::new(vec![scan.clone(), scan])); + let statistics = union_exec.statistics_by_partition()?; + // Check that we have 4 partitions (2 from each scan) + assert_eq!(statistics.len(), 4); + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); + // Verify first partition (from first scan) + assert_eq!(statistics[0], expected_statistic_partition_1); + // Verify second partition (from first scan) + assert_eq!(statistics[1], expected_statistic_partition_2); + // Verify third partition (from second scan - same as first partition) + assert_eq!(statistics[2], expected_statistic_partition_1); + // Verify fourth partition (from second scan - same as second partition) + assert_eq!(statistics[3], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2), (3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(union_exec, expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_cross_join() -> Result<()> { + let left_scan = create_scan_exec_with_statistics(None, Some(1)).await; + let right_create_table_sql = "CREATE EXTERNAL TABLE t2 (id INT NOT NULL) \ + STORED AS PARQUET LOCATION './tests/data/test_statistics_per_partition'\ + WITH ORDER (id ASC);"; + let right_scan = + create_scan_exec_with_statistics(Some(right_create_table_sql), Some(2)).await; + let cross_join = CrossJoinExec::new(left_scan, right_scan); + let statistics = cross_join.statistics_by_partition()?; + // Check that we have 2 partitions + assert_eq!(statistics.len(), 2); + let mut expected_statistic_partition_1 = + create_partition_statistics(8, 48400, 1, 4, true); + expected_statistic_partition_1 + .column_statistics + .push(ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(4))), + min_value: Precision::Exact(ScalarValue::Int32(Some(3))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }); + let mut expected_statistic_partition_2 = + create_partition_statistics(8, 48400, 1, 4, true); + expected_statistic_partition_2 + .column_statistics + .push(ColumnStatistics { + null_count: Precision::Exact(0), + max_value: Precision::Exact(ScalarValue::Int32(Some(2))), + min_value: Precision::Exact(ScalarValue::Int32(Some(1))), + sum_value: Precision::Absent, + distinct_count: Precision::Absent, + }); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(1, 4, 8), (1, 4, 8)]; + validate_statistics_with_data(Arc::new(cross_join), expected_stats, 0).await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_coalesce_batches() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let coalesce_batches = CoalesceBatchesExec::new(scan, 2); + let expected_statistic_partition_1 = + create_partition_statistics(2, 110, 3, 4, true); + let expected_statistic_partition_2 = + create_partition_statistics(2, 110, 1, 2, true); + let statistics = coalesce_batches.statistics_by_partition()?; + assert_eq!(statistics.len(), 2); + assert_eq!(statistics[0], expected_statistic_partition_1); + assert_eq!(statistics[1], expected_statistic_partition_2); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(3, 4, 2), (1, 2, 2)]; + validate_statistics_with_data(Arc::new(coalesce_batches), expected_stats, 0) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_coalesce_partitions() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let coalesce_partitions = CoalescePartitionsExec::new(scan); + let expected_statistic_partition = + create_partition_statistics(4, 220, 1, 4, true); + let statistics = coalesce_partitions.statistics_by_partition()?; + assert_eq!(statistics.len(), 1); + assert_eq!(statistics[0], expected_statistic_partition); + + // Check the statistics_by_partition with real results + let expected_stats = vec![(1, 4, 4)]; + validate_statistics_with_data(Arc::new(coalesce_partitions), expected_stats, 0) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_local_limit() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let local_limit = LocalLimitExec::new(scan.clone(), 1); + let statistics = local_limit.statistics_by_partition()?; + assert_eq!(statistics.len(), 2); + let schema = scan.schema(); + let mut expected_statistic_partition = Statistics::new_unknown(&schema); + expected_statistic_partition.num_rows = Precision::Exact(1); + assert_eq!(statistics[0], expected_statistic_partition); + assert_eq!(statistics[1], expected_statistic_partition); + Ok(()) + } + + #[tokio::test] + async fn test_statistic_by_partition_of_global_limit_partitions() -> Result<()> { + let scan = create_scan_exec_with_statistics(None, Some(2)).await; + let global_limit = GlobalLimitExec::new(scan.clone(), 0, Some(2)); + let statistics = global_limit.statistics_by_partition()?; + assert_eq!(statistics.len(), 1); + let mut expected_statistic_partition = Statistics::new_unknown(&scan.schema()); + expected_statistic_partition.num_rows = Precision::Exact(2); + assert_eq!(statistics[0], expected_statistic_partition); + Ok(()) + } +} diff --git a/datafusion/datasource-avro/src/source.rs b/datafusion/datasource-avro/src/source.rs index ce3722e7b11ee..1a0feea3f1535 100644 --- a/datafusion/datasource-avro/src/source.rs +++ b/datafusion/datasource-avro/src/source.rs @@ -40,6 +40,7 @@ use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; +use datafusion_physical_plan::statistics::PartitionedStatistics; use object_store::ObjectStore; /// Execution plan for scanning Avro data source @@ -141,6 +142,10 @@ impl ExecutionPlan for AvroExec { self.inner.statistics() } + fn statistics_by_partition(&self) -> Result { + self.inner.statistics_by_partition() + } + fn metrics(&self) -> Option { self.inner.metrics() } diff --git a/datafusion/datasource-csv/src/source.rs b/datafusion/datasource-csv/src/source.rs index f5d45cd3fc881..28aad5e43f147 100644 --- a/datafusion/datasource-csv/src/source.rs +++ b/datafusion/datasource-csv/src/source.rs @@ -51,6 +51,7 @@ use datafusion_physical_plan::{ use crate::file_format::CsvDecoder; use datafusion_datasource::file_groups::FileGroup; +use datafusion_physical_plan::statistics::PartitionedStatistics; use futures::{StreamExt, TryStreamExt}; use object_store::buffered::BufWriter; use object_store::{GetOptions, GetResultPayload, ObjectStore}; @@ -381,6 +382,10 @@ impl ExecutionPlan for CsvExec { self.inner.statistics() } + fn statistics_by_partition(&self) -> Result { + self.inner.statistics_by_partition() + } + fn metrics(&self) -> Option { self.inner.metrics() } diff --git a/datafusion/datasource/src/file_groups.rs b/datafusion/datasource/src/file_groups.rs index 15c86427ed00a..907ea62e4e7e3 100644 --- a/datafusion/datasource/src/file_groups.rs +++ b/datafusion/datasource/src/file_groups.rs @@ -421,7 +421,12 @@ impl FileGroup { } /// Get the statistics for this group - pub fn statistics(&self) -> Option<&Statistics> { + pub fn statistics(&self) -> &Option> { + &self.statistics + } + + /// Get the statistics for this group + pub fn statistics_ref(&self) -> Option<&Statistics> { self.statistics.as_deref() } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 2d6ea1a8b3915..deff4cb205f0a 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::statistics::PartitionedStatistics; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, }; @@ -188,6 +189,26 @@ impl ExecutionPlan for DataSourceExec { self.data_source.statistics() } + fn statistics_by_partition(&self) -> Result { + let mut statistics = { + let mut v = + Vec::with_capacity(self.properties().partitioning.partition_count()); + (0..self.properties().partitioning.partition_count()) + .for_each(|_| v.push(Arc::new(Statistics::new_unknown(&self.schema())))); + v + }; + if let Some(file_config) = + self.data_source.as_any().downcast_ref::() + { + for (idx, file_group) in file_config.file_groups.iter().enumerate() { + if let Some(stat) = file_group.statistics() { + statistics[idx] = Arc::clone(stat); + } + } + } + Ok(PartitionedStatistics::new(statistics)) + } + fn with_fetch(&self, limit: Option) -> Option> { let data_source = self.data_source.with_fetch(limit)?; let cache = self.cache.clone(); diff --git a/datafusion/datasource/src/statistics.rs b/datafusion/datasource/src/statistics.rs index 8a04d77b273d4..192bb43c85953 100644 --- a/datafusion/datasource/src/statistics.rs +++ b/datafusion/datasource/src/statistics.rs @@ -476,7 +476,7 @@ pub fn compute_all_files_statistics( // Then summary statistics across all file groups let file_groups_statistics = file_groups_with_stats .iter() - .filter_map(|file_group| file_group.statistics()); + .filter_map(|file_group| file_group.statistics().as_deref()); let mut statistics = Statistics::try_merge_iter(file_groups_statistics, &table_schema)?; diff --git a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs index 65a6198040bea..3c2b66af8539d 100644 --- a/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs +++ b/datafusion/physical-optimizer/src/enforce_sorting/replace_with_order_preserving_variants.rs @@ -34,7 +34,7 @@ use datafusion_physical_plan::execution_plan::EmissionType; use datafusion_physical_plan::repartition::RepartitionExec; use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use datafusion_physical_plan::tree_node::PlanContext; -use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use datafusion_physical_plan::ExecutionPlanProperties; use itertools::izip; @@ -205,9 +205,8 @@ pub fn plan_with_order_breaking_variants( // Replace `SortPreservingMergeExec` with a `CoalescePartitionsExec` // SPM may have `fetch`, so pass it to the `CoalescePartitionsExec` let child = Arc::clone(&sort_input.children[0].plan); - let coalesce = CoalescePartitionsExec::new(child) - .with_fetch(plan.fetch()) - .unwrap(); + let coalesce = + Arc::new(CoalescePartitionsExec::new(child).with_fetch(plan.fetch())); sort_input.plan = coalesce; } else { return sort_input.update_plan_from_children(); diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index faab5fdc5eb6c..888cea6d9cdb0 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -24,6 +24,7 @@ use std::task::{Context, Poll}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; +use crate::statistics::PartitionedStatistics; use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; @@ -199,6 +200,26 @@ impl ExecutionPlan for CoalesceBatchesExec { Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } + fn statistics_by_partition(&self) -> Result { + let input_stats = self.input.statistics_by_partition()?; + + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let fetched_stat = Statistics::with_fetch( + stat.clone(), + self.schema(), + self.fetch, + 0, + 1, + )?; + Ok(Arc::new(fetched_stat)) + }) + .collect(); + + Ok(PartitionedStatistics::new(stats?)) + } + fn with_fetch(&self, limit: Option) -> Option> { Some(Arc::new(CoalesceBatchesExec { input: Arc::clone(&self.input), diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs b/datafusion/physical-plan/src/coalesce_partitions.rs index 715dd159e7e8c..8906aa1b04b69 100644 --- a/datafusion/physical-plan/src/coalesce_partitions.rs +++ b/datafusion/physical-plan/src/coalesce_partitions.rs @@ -31,6 +31,7 @@ use crate::execution_plan::CardinalityEffect; use crate::projection::{make_with_child, ProjectionExec}; use crate::{DisplayFormatType, ExecutionPlan, Partitioning}; +use crate::statistics::PartitionedStatistics; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; @@ -199,6 +200,12 @@ impl ExecutionPlan for CoalescePartitionsExec { Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } + fn statistics_by_partition(&self) -> Result { + Ok(PartitionedStatistics::new(vec![Arc::new( + self.statistics()?, + )])) + } + fn supports_limit_pushdown(&self) -> bool { true } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2b6eac7be0675..cc17ebef26e89 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -54,6 +54,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering}; use datafusion_physical_expr_common::sort_expr::LexRequirement; +use crate::statistics::PartitionedStatistics; use futures::stream::{StreamExt, TryStreamExt}; /// Represent nodes in the DataFusion Physical Plan. @@ -430,6 +431,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { Ok(Statistics::new_unknown(&self.schema())) } + /// Returns statistics for each partition of this `ExecutionPlan` node. + /// If statistics are not available, returns an array of + /// [`Statistics::new_unknown`] for each partition. + fn statistics_by_partition(&self) -> Result { + Ok(PartitionedStatistics::new({ + let mut v = + Vec::with_capacity(self.properties().partitioning.partition_count()); + (0..self.properties().partitioning.partition_count()) + .for_each(|_| v.push(Arc::new(Statistics::new_unknown(&self.schema())))); + v + })) + } + /// Returns `true` if a limit can be safely pushed down through this /// `ExecutionPlan` node. /// diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 95fa67025e90d..59df4a5f3f861 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -58,6 +58,7 @@ use datafusion_physical_expr::{ ExprBoundaries, PhysicalExpr, }; +use crate::statistics::PartitionedStatistics; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -174,12 +175,11 @@ impl FilterExec { /// Calculates `Statistics` for `FilterExec`, by applying selectivity (either default, or estimated) to input statistics. fn statistics_helper( - input: &Arc, + schema: SchemaRef, + input_stats: Statistics, predicate: &Arc, default_selectivity: u8, ) -> Result { - let input_stats = input.statistics()?; - let schema = input.schema(); if !check_support(predicate, &schema) { let selectivity = default_selectivity as f64 / 100.0; let mut stats = input_stats.to_inexact(); @@ -193,7 +193,7 @@ impl FilterExec { let num_rows = input_stats.num_rows; let total_byte_size = input_stats.total_byte_size; let input_analysis_ctx = AnalysisContext::try_from_statistics( - &input.schema(), + &schema, &input_stats.column_statistics, )?; @@ -260,7 +260,12 @@ impl FilterExec { ) -> Result { // Combine the equal predicates with the input equivalence properties // to construct the equivalence properties: - let stats = Self::statistics_helper(input, predicate, default_selectivity)?; + let stats = Self::statistics_helper( + input.schema(), + input.statistics()?, + predicate, + default_selectivity, + )?; let mut eq_properties = input.equivalence_properties().clone(); let (equal_pairs, _) = collect_columns_from_predicate(predicate); for (lhs, rhs) in equal_pairs { @@ -401,13 +406,34 @@ impl ExecutionPlan for FilterExec { /// predicate's selectivity value can be determined for the incoming data. fn statistics(&self) -> Result { let stats = Self::statistics_helper( - &self.input, + self.schema(), + self.input().statistics()?, self.predicate(), self.default_selectivity, )?; Ok(stats.project(self.projection.as_ref())) } + fn statistics_by_partition(&self) -> Result { + let input_stats = self.input.statistics_by_partition()?; + + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let stat = Self::statistics_helper( + self.schema(), + stat.clone(), + self.predicate(), + self.default_selectivity, + ) + .map(|stat| stat.project(self.projection.as_ref()))?; + Ok(Arc::new(stat)) + }) + .collect(); + + Ok(PartitionedStatistics::new(stats?)) + } + fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::LowerEqual } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 8dd1addff15ce..4e14e41178b58 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -46,6 +46,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_physical_expr::equivalence::join_equivalence_properties; +use crate::statistics::{compute_summary_statistics, PartitionedStatistics}; use async_trait::async_trait; use futures::{ready, Stream, StreamExt, TryStreamExt}; @@ -343,6 +344,31 @@ impl ExecutionPlan for CrossJoinExec { )) } + fn statistics_by_partition(&self) -> Result { + let left_stats = self.left.statistics_by_partition()?; + let right_stats = self.right.statistics_by_partition()?; + + if left_stats.is_empty() || right_stats.is_empty() { + return Ok(PartitionedStatistics::new(vec![])); + } + + // Summarize the `left_stats` + let statistics = compute_summary_statistics( + left_stats.iter(), + self.left.schema().fields().len(), + |stats| Some(stats), + ); + + Ok(PartitionedStatistics::new( + right_stats + .iter() + .map(|right| { + Arc::new(stats_cartesian_product(statistics.clone(), right.clone())) + }) + .collect(), + )) + } + /// Tries to swap the projection with its input [`CrossJoinExec`]. If it can be done, /// it returns the new swapped version having the [`CrossJoinExec`] as the top plan. /// Otherwise, it returns None. diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index e8904db0f3eaf..50282e056777d 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -883,8 +883,8 @@ impl ExecutionPlan for HashJoinExec { // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` let stats = estimate_join_statistics( - Arc::clone(&self.left), - Arc::clone(&self.right), + self.left.statistics()?, + self.right.statistics()?, self.on.clone(), &self.join_type, &self.join_schema, diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index b902795950966..94b6725bb19cf 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -568,8 +568,8 @@ impl ExecutionPlan for NestedLoopJoinExec { fn statistics(&self) -> Result { estimate_join_statistics( - Arc::clone(&self.left), - Arc::clone(&self.right), + self.left.statistics()?, + self.right.statistics()?, vec![], &self.join_type, &self.join_schema, diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 89f2e3c911f89..28612540bce5b 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -518,8 +518,8 @@ impl ExecutionPlan for SortMergeJoinExec { // There are some special cases though, for example: // - `A LEFT JOIN B ON A.col=B.col` with `COUNT_DISTINCT(B.col)=COUNT(B.col)` estimate_join_statistics( - Arc::clone(&self.left), - Arc::clone(&self.right), + self.left.statistics()?, + self.right.statistics()?, self.on.clone(), &self.join_type, &self.schema, diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 5516f172d5101..ce1860d20565c 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -403,15 +403,12 @@ struct PartialJoinStatistics { /// Estimate the statistics for the given join's output. pub(crate) fn estimate_join_statistics( - left: Arc, - right: Arc, + left_stats: Statistics, + right_stats: Statistics, on: JoinOn, join_type: &JoinType, schema: &Schema, ) -> Result { - let left_stats = left.statistics()?; - let right_stats = right.statistics()?; - let join_stats = estimate_join_cardinality(join_type, left_stats, right_stats, &on); let (num_rows, column_statistics) = match join_stats { Some(stats) => (Precision::Inexact(stats.num_rows), stats.column_statistics), diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index a1862554b303e..4493569ae6ffd 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -92,5 +92,6 @@ pub mod udaf { } pub mod coalesce; +pub mod statistics; #[cfg(test)] pub mod test; diff --git a/datafusion/physical-plan/src/limit.rs b/datafusion/physical-plan/src/limit.rs index 89cf47a6d6508..b2eedff1a63b1 100644 --- a/datafusion/physical-plan/src/limit.rs +++ b/datafusion/physical-plan/src/limit.rs @@ -35,6 +35,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::{internal_err, Result}; use datafusion_execution::TaskContext; +use crate::statistics::PartitionedStatistics; use futures::stream::{Stream, StreamExt}; use log::trace; @@ -202,6 +203,12 @@ impl ExecutionPlan for GlobalLimitExec { ) } + fn statistics_by_partition(&self) -> Result { + Ok(PartitionedStatistics::new(vec![Arc::new( + self.statistics()?, + )])) + } + fn fetch(&self) -> Option { self.fetch } @@ -343,6 +350,26 @@ impl ExecutionPlan for LocalLimitExec { ) } + fn statistics_by_partition(&self) -> Result { + let input_stats = self.input.statistics_by_partition()?; + + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let fetched_stat = Statistics::with_fetch( + stat.clone(), + self.schema(), + Some(self.fetch), + 0, + 1, + )?; + Ok(Arc::new(fetched_stat)) + }) + .collect(); + + Ok(PartitionedStatistics::new(stats?)) + } + fn fetch(&self) -> Option { Some(self.fetch) } diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 72934c74446eb..831fb46ed3e86 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -48,6 +48,7 @@ use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::PhysicalExprRef; +use crate::statistics::PartitionedStatistics; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use itertools::Itertools; @@ -251,6 +252,22 @@ impl ExecutionPlan for ProjectionExec { )) } + fn statistics_by_partition(&self) -> Result { + Ok(PartitionedStatistics::new( + self.input + .statistics_by_partition()? + .iter() + .map(|input_stats| { + Arc::new(stats_projection( + input_stats.clone(), + self.expr.iter().map(|(e, _)| Arc::clone(e)), + Arc::clone(&self.schema), + )) + }) + .collect(), + )) + } + fn supports_limit_pushdown(&self) -> bool { true } diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 9d0f34cc7f0fd..62b6e040031a9 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -60,6 +60,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::LexRequirement; +use crate::statistics::PartitionedStatistics; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; @@ -1300,6 +1301,31 @@ impl ExecutionPlan for SortExec { Statistics::with_fetch(self.input.statistics()?, self.schema(), self.fetch, 0, 1) } + fn statistics_by_partition(&self) -> Result { + if !self.preserve_partitioning() { + return Ok(PartitionedStatistics::new(vec![Arc::new( + self.statistics()?, + )])); + } + let input_stats = self.input.statistics_by_partition()?; + + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let fetched_stat = Statistics::with_fetch( + stat.clone(), + self.schema(), + self.fetch, + 0, + 1, + )?; + Ok(Arc::new(fetched_stat)) + }) + .collect(); + + Ok(PartitionedStatistics::new(stats?)) + } + fn with_fetch(&self, limit: Option) -> Option> { Some(Arc::new(SortExec::with_fetch(self, limit))) } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index b987dff36441d..8d44962cdb33c 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -36,6 +36,7 @@ use datafusion_execution::TaskContext; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use crate::statistics::PartitionedStatistics; use log::{debug, trace}; /// Sort preserving merge execution plan @@ -346,6 +347,12 @@ impl ExecutionPlan for SortPreservingMergeExec { self.input.statistics() } + fn statistics_by_partition(&self) -> Result { + Ok(PartitionedStatistics::new(vec![Arc::new( + self.statistics()?, + )])) + } + fn supports_limit_pushdown(&self) -> bool { true } diff --git a/datafusion/physical-plan/src/statistics.rs b/datafusion/physical-plan/src/statistics.rs new file mode 100644 index 0000000000000..d005d861c9321 --- /dev/null +++ b/datafusion/physical-plan/src/statistics.rs @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the cross join plan for loading the left side of the cross join +//! and producing batches in parallel for the right partitions + +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, ScalarValue, Statistics}; +use std::mem; +use std::ops::Index; +use std::sync::Arc; + +/// Represents statistics data grouped by partition. +/// +/// This structure maintains a collection of statistics, one for each partition +/// of a distributed dataset, allowing access to statistics by partition index. +#[derive(Debug, Clone)] +pub struct PartitionedStatistics { + inner: Vec>, +} + +impl PartitionedStatistics { + pub fn new(statistics: Vec>) -> Self { + Self { inner: statistics } + } + + pub fn statistics(&self, partition_idx: usize) -> &Statistics { + &self.inner[partition_idx] + } + + pub fn get_statistics(&self, partition_idx: usize) -> Option<&Statistics> { + self.inner.get(partition_idx).map(|arc| arc.as_ref()) + } + + pub fn iter(&self) -> impl Iterator { + self.inner.iter().map(|arc| arc.as_ref()) + } + + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + pub fn len(&self) -> usize { + self.inner.len() + } +} + +impl Index for PartitionedStatistics { + type Output = Statistics; + + fn index(&self, partition_idx: usize) -> &Self::Output { + self.statistics(partition_idx) + } +} + +/// Generic function to compute statistics across multiple items that have statistics +pub fn compute_summary_statistics( + items: I, + column_count: usize, + stats_extractor: impl Fn(&T) -> Option<&Statistics>, +) -> Statistics +where + I: IntoIterator, +{ + let mut col_stats_set = vec![ColumnStatistics::default(); column_count]; + let mut num_rows = Precision::::Absent; + let mut total_byte_size = Precision::::Absent; + + for (idx, item) in items.into_iter().enumerate() { + if let Some(item_stats) = stats_extractor(&item) { + if idx == 0 { + // First item, set values directly + num_rows = item_stats.num_rows; + total_byte_size = item_stats.total_byte_size; + for (index, column_stats) in + item_stats.column_statistics.iter().enumerate() + { + col_stats_set[index].null_count = column_stats.null_count; + col_stats_set[index].max_value = column_stats.max_value.clone(); + col_stats_set[index].min_value = column_stats.min_value.clone(); + col_stats_set[index].sum_value = column_stats.sum_value.clone(); + } + continue; + } + + // Accumulate statistics for subsequent items + num_rows = add_row_stats(item_stats.num_rows, num_rows); + total_byte_size = add_row_stats(item_stats.total_byte_size, total_byte_size); + + for (item_col_stats, col_stats) in item_stats + .column_statistics + .iter() + .zip(col_stats_set.iter_mut()) + { + col_stats.null_count = + add_row_stats(item_col_stats.null_count, col_stats.null_count); + set_max_if_greater(&item_col_stats.max_value, &mut col_stats.max_value); + set_min_if_lesser(&item_col_stats.min_value, &mut col_stats.min_value); + col_stats.sum_value = item_col_stats.sum_value.add(&col_stats.sum_value); + } + } + } + + Statistics { + num_rows, + total_byte_size, + column_statistics: col_stats_set, + } +} + +/// If the given value is numerically greater than the original maximum value, +/// return the new maximum value with appropriate exactness information. +pub fn set_max_if_greater( + max_nominee: &Precision, + max_value: &mut Precision, +) { + match (&max_value, max_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => { + *max_value = max_nominee.clone(); + } + (Precision::Exact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Exact(val2)) + if val1 < val2 => + { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_max = mem::take(max_value); + *max_value = exact_max.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *max_value = max_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *max_value = max_nominee.clone(); + } + _ => {} + } +} + +/// If the given value is numerically lesser than the original minimum value, +/// return the new minimum value with appropriate exactness information. +pub fn set_min_if_lesser( + min_nominee: &Precision, + min_value: &mut Precision, +) { + match (&min_value, min_nominee) { + (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => { + *min_value = min_nominee.clone(); + } + (Precision::Exact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Inexact(val2)) + | (Precision::Inexact(val1), Precision::Exact(val2)) + if val1 > val2 => + { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Exact(_), Precision::Absent) => { + let exact_min = mem::take(min_value); + *min_value = exact_min.to_inexact(); + } + (Precision::Absent, Precision::Exact(_)) => { + *min_value = min_nominee.clone().to_inexact(); + } + (Precision::Absent, Precision::Inexact(_)) => { + *min_value = min_nominee.clone(); + } + _ => {} + } +} + +pub fn add_row_stats( + file_num_rows: Precision, + num_rows: Precision, +) -> Precision { + match (file_num_rows, &num_rows) { + (Precision::Absent, _) => num_rows.to_inexact(), + (lhs, Precision::Absent) => lhs.to_inexact(), + (lhs, rhs) => lhs.add(rhs), + } +} diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 2b666093f29e0..c310ff5692e8d 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -46,6 +46,7 @@ use datafusion_common::{exec_err, internal_err, DataFusionError, Result}; use datafusion_execution::TaskContext; use datafusion_physical_expr::{calculate_union, EquivalenceProperties}; +use crate::statistics::PartitionedStatistics; use futures::Stream; use itertools::Itertools; use log::{debug, trace, warn}; @@ -270,6 +271,21 @@ impl ExecutionPlan for UnionExec { .unwrap_or_else(|| Statistics::new_unknown(&self.schema()))) } + fn statistics_by_partition(&self) -> Result { + let input_stats_vec = self + .inputs + .iter() + .map(|input| input.statistics_by_partition()) + .collect::>>()?; + + let all_stats: Vec<_> = input_stats_vec + .iter() + .flat_map(|input_stats| input_stats.iter().map(|stat| Arc::new(stat.clone()))) + .collect(); + + Ok(PartitionedStatistics::new(all_stats)) + } + fn benefits_from_input_partitioning(&self) -> Vec { vec![false; self.children().len()] } diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 92138bf6a7a1a..36dc2a3799c8d 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -63,6 +63,7 @@ use datafusion_physical_expr::window::{ use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement}; +use crate::statistics::PartitionedStatistics; use futures::stream::Stream; use futures::{ready, StreamExt}; use hashbrown::hash_table::HashTable; @@ -226,6 +227,23 @@ impl BoundedWindowAggExec { .unwrap_or_else(Vec::new) } } + + fn statistics_helper(&self, statistics: Statistics) -> Result { + let win_cols = self.window_expr.len(); + let input_cols = self.input.schema().fields().len(); + // TODO stats: some windowing function will maintain invariants such as min, max... + let mut column_statistics = Vec::with_capacity(win_cols + input_cols); + // copy stats of the input to the beginning of the schema. + column_statistics.extend(statistics.column_statistics); + for _ in 0..win_cols { + column_statistics.push(ColumnStatistics::new_unknown()) + } + Ok(Statistics { + num_rows: statistics.num_rows, + column_statistics, + total_byte_size: Precision::Absent, + }) + } } impl DisplayAs for BoundedWindowAggExec { @@ -344,20 +362,19 @@ impl ExecutionPlan for BoundedWindowAggExec { fn statistics(&self) -> Result { let input_stat = self.input.statistics()?; - let win_cols = self.window_expr.len(); - let input_cols = self.input.schema().fields().len(); - // TODO stats: some windowing function will maintain invariants such as min, max... - let mut column_statistics = Vec::with_capacity(win_cols + input_cols); - // copy stats of the input to the beginning of the schema. - column_statistics.extend(input_stat.column_statistics); - for _ in 0..win_cols { - column_statistics.push(ColumnStatistics::new_unknown()) - } - Ok(Statistics { - num_rows: input_stat.num_rows, - column_statistics, - total_byte_size: Precision::Absent, - }) + self.statistics_helper(input_stat) + } + + fn statistics_by_partition(&self) -> Result { + let input_stats = self.input.statistics_by_partition()?; + let stats: Result>> = input_stats + .iter() + .map(|stat| { + let stat = self.statistics_helper(stat.clone())?; + Ok(Arc::new(stat)) + }) + .collect(); + Ok(PartitionedStatistics::new(stats?)) } }