Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/common/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ impl Statistics {

/// Summarize zero or more statistics into a single `Statistics` instance.
///
/// The method assumes that all statistics are for the same schema.
/// If not, maybe you can call `SchemaMapper::map_column_statistics` to make them consistent.
///
/// Returns an error if the statistics do not match the specified schemas.
pub fn try_merge_iter<'a, I>(items: I, schema: &Schema) -> Result<Statistics>
where
Expand Down
13 changes: 12 additions & 1 deletion datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::execution::context::SessionState;
use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
Expand Down Expand Up @@ -1129,7 +1130,17 @@ impl ListingTable {
let (file_group, inexact_stats) =
get_files_with_limit(files, limit, self.options.collect_stat).await?;

let file_groups = file_group.split_files(self.options.target_partitions);
let mut file_groups = file_group.split_files(self.options.target_partitions);
let (schema_mapper, _) = DefaultSchemaAdapterFactory::from_schema(self.schema())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using the default schema mapper makes sense for now / in this PR, but in general I think it would make sense to allow the user to provide their own schema mapping rules here (so a default value that is not NULL can be used, for example) via their own mapper.

However, we woudl have to add a schema mapper factory to ListingOptions

https://github.com/apache/datafusion/blob/f1bbb1d636650c7f28f52dc507f36e64d71e1aa8/datafusion/core/src/datasource/listing/table.rs#L256-L255

(this is not a change needed for this PR, I just noticed it while reviewing this PR)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense, I filed an issue to track: #15889

Comment on lines +1133 to +1134
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb While I was working on #15852, I found in fact, for listing table, doesn't have the issue described in #15689, that is, all files here have the same schema because when creating table, all fetched files already use the SchemaMapper to reorder their schema, see here: https://github.com/apache/datafusion/blob/main/datafusion/datasource-parquet/src/opener.rs#L206.

What we should fix is let the file schema match the listing table schema, usually, if users specify the partition col, table schema will have the extra partition col infos, so I moved the mapper down the compute_all_files_statistics method in the commit: 689fc66.

.map_schema(self.file_schema.as_ref())?;
// Use schema_mapper to map each file-level column statistics to table-level column statistics
file_groups.iter_mut().try_for_each(|file_group| {
if let Some(stat) = file_group.statistics_mut() {
stat.column_statistics =
schema_mapper.map_column_statistics(&stat.column_statistics)?;
}
Ok::<_, DataFusionError>(())
})?;
compute_all_files_statistics(
file_groups,
self.schema(),
Expand Down
7 changes: 7 additions & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,5 +264,12 @@ mod tests {

Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}

fn map_column_statistics(
&self,
_file_col_statistics: &[datafusion_common::ColumnStatistics],
) -> datafusion_common::Result<Vec<datafusion_common::ColumnStatistics>> {
unimplemented!()
}
}
}
5 changes: 5 additions & 0 deletions datafusion/datasource/src/file_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,11 @@ impl FileGroup {
self.statistics.as_deref()
}

/// Get the mutable reference to the statistics for this group
pub fn statistics_mut(&mut self) -> Option<&mut Statistics> {
self.statistics.as_mut().map(Arc::make_mut)
}

/// Partition the list of files into `n` groups
pub fn split_files(mut self, n: usize) -> Vec<FileGroup> {
if self.is_empty() {
Expand Down
130 changes: 129 additions & 1 deletion datafusion/datasource/src/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
use arrow::array::{new_null_array, RecordBatch, RecordBatchOptions};
use arrow::compute::{can_cast_types, cast};
use arrow::datatypes::{Schema, SchemaRef};
use datafusion_common::plan_err;
use datafusion_common::{plan_err, ColumnStatistics};
use std::fmt::Debug;
use std::sync::Arc;

Expand Down Expand Up @@ -96,6 +96,12 @@ pub trait SchemaAdapter: Send + Sync {
pub trait SchemaMapper: Debug + Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema`
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;

/// Adapts file-level column `Statistics` to match the `table_schema`
fn map_column_statistics(
&self,
file_col_statistics: &[ColumnStatistics],
) -> datafusion_common::Result<Vec<ColumnStatistics>>;
}

/// Default [`SchemaAdapterFactory`] for mapping schemas.
Expand Down Expand Up @@ -334,4 +340,126 @@ impl SchemaMapper for SchemaMapping {
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}

/// Adapts file-level column `Statistics` to match the `table_schema`
fn map_column_statistics(
&self,
file_col_statistics: &[ColumnStatistics],
) -> datafusion_common::Result<Vec<ColumnStatistics>> {
let mut table_col_statistics = vec![];

// Map the statistics for each field in the file schema to the corresponding field in the
// table schema, if a field is not present in the file schema, we need to fill it with `ColumnStatistics::new_unknown`
for (_, file_col_idx) in self
.projected_table_schema
.fields()
.iter()
.zip(&self.field_mappings)
{
if let Some(file_col_idx) = file_col_idx {
table_col_statistics.push(
file_col_statistics
.get(*file_col_idx)
.cloned()
.unwrap_or_default(),
);
} else {
table_col_statistics.push(ColumnStatistics::new_unknown());
}
}
Comment on lines +351 to +369
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is new_unknown "all nulls"? Because that's what gets filled into the data by default. I wonder how this would interact with #15261?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean "all nulls", from the code I think it means Absent

    pub fn new_unknown() -> Self {
        Self {
            null_count: Precision::Absent,
            max_value: Precision::Absent,
            min_value: Precision::Absent,
            sum_value: Precision::Absent,
            distinct_count: Precision::Absent,
        }
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that I think we have enough information to make the stats null_count: Precision::Exact(row_count) and absent for the rest


Ok(table_col_statistics)
}
}

#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
use datafusion_common::{stats::Precision, Statistics};

use super::*;

#[test]
fn test_schema_mapping_map_statistics_basic() {
// Create table schema (a, b, c)
let table_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Float64, true),
]));

// Create file schema (b, a) - different order, missing c
let file_schema = Schema::new(vec![
Field::new("b", DataType::Utf8, true),
Field::new("a", DataType::Int32, true),
]);

// Create SchemaAdapter
let adapter = DefaultSchemaAdapter {
projected_table_schema: Arc::clone(&table_schema),
};

// Get mapper and projection
let (mapper, projection) = adapter.map_schema(&file_schema).unwrap();

// Should project columns 0,1 from file
assert_eq!(projection, vec![0, 1]);

// Create file statistics
let mut file_stats = Statistics::default();

// Statistics for column b (index 0 in file)
let b_stats = ColumnStatistics {
null_count: Precision::Exact(5),
..Default::default()
};

// Statistics for column a (index 1 in file)
let a_stats = ColumnStatistics {
null_count: Precision::Exact(10),
..Default::default()
};

file_stats.column_statistics = vec![b_stats, a_stats];

// Map statistics
let table_col_stats = mapper
.map_column_statistics(&file_stats.column_statistics)
.unwrap();

// Verify stats
assert_eq!(table_col_stats.len(), 3);
assert_eq!(table_col_stats[0].null_count, Precision::Exact(10)); // a from file idx 1
assert_eq!(table_col_stats[1].null_count, Precision::Exact(5)); // b from file idx 0
assert_eq!(table_col_stats[2].null_count, Precision::Absent); // c (unknown)
}

#[test]
fn test_schema_mapping_map_statistics_empty() {
// Create schemas
let table_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
]));
let file_schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
]);

let adapter = DefaultSchemaAdapter {
projected_table_schema: Arc::clone(&table_schema),
};
let (mapper, _) = adapter.map_schema(&file_schema).unwrap();

// Empty file statistics
let file_stats = Statistics::default();
let table_col_stats = mapper
.map_column_statistics(&file_stats.column_statistics)
.unwrap();

// All stats should be unknown
assert_eq!(table_col_stats.len(), 2);
assert_eq!(table_col_stats[0], ColumnStatistics::new_unknown(),);
assert_eq!(table_col_stats[1], ColumnStatistics::new_unknown(),);
}
}