Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,28 @@

//! Common behaviors that every file format needs to implement

use datafusion_physical_plan::DisplayAs;
use std::any::Any;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;

use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::{
get_projected_output_ordering, FileScanConfig, FileScanConfigBuilder,
};
use crate::file_stream::{FileOpener, FileStream};
use crate::schema_adapter::SchemaAdapterFactory;
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, Result, Statistics};
use datafusion_common::{
not_impl_err, ColumnStatistics, Constraints, Result, Statistics,
};
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, Partitioning, PhysicalExpr,
};
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayAs;
use datafusion_physical_plan::{DisplayFormatType, ExecutionPlan};
use std::any::Any;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;

use crate::display::FileGroupsDisplay;
use crate::source::{DataSource, DataSourceExec};
Expand Down Expand Up @@ -164,6 +166,28 @@ pub trait FileSource: fmt::Debug + Send + Sync {
fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
None
}

/// Project the schema, constraints, and the statistics on the given column indices
fn project(&self) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
let config = self.config();

if config.projection.is_none() && config.table_partition_cols.is_empty() {
return (
Arc::clone(&config.file_schema),
config.constraints.clone(),
self.file_source_statistics(),
config.output_ordering.clone(),
);
}

let schema = config.projected_schema();
let constraints = config.projected_constraints();
let stats = self.as_data_source().data_source_statistics();

let output_ordering = get_projected_output_ordering(config, &schema);

(schema, constraints, stats, output_ordering)
}
}

impl<T: FileSource + 'static> DataSource for T {
Expand Down Expand Up @@ -260,8 +284,7 @@ impl<T: FileSource + 'static> DataSource for T {
}

fn eq_properties(&self) -> EquivalenceProperties {
let (schema, constraints, _, orderings) =
self.config().project(self.file_source_statistics());
let (schema, constraints, _, orderings) = self.project();
EquivalenceProperties::new_with_orderings(schema, orderings)
.with_constraints(constraints)
}
Expand All @@ -270,8 +293,29 @@ impl<T: FileSource + 'static> DataSource for T {
SchedulingType::Cooperative
}

fn data_source_statistics(&self) -> Result<Statistics> {
Ok(self.config().projected_stats(self.file_source_statistics()))
fn data_source_statistics(&self) -> Statistics {
let file_source_statistics = self.file_source_statistics();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love data_source_statistics and file_source_statistics, curious if people had thoughts on better names here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both statistics()s are good to me


let table_cols_stats = self
.config()
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.config().file_schema.fields().len() {
file_source_statistics.column_statistics[idx].clone()
} else {
// TODO provide accurate stat for partition column (#1186)
ColumnStatistics::new_unknown()
}
})
.collect();

Statistics {
num_rows: file_source_statistics.num_rows,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
total_byte_size: file_source_statistics.total_byte_size,
column_statistics: table_cols_stats,
}
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
Expand Down
62 changes: 6 additions & 56 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ use arrow::{
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
};
use datafusion_common::{
exec_err, ColumnStatistics, Constraints, DataFusionError, Result, ScalarValue,
Statistics,
exec_err, Constraints, DataFusionError, Result, ScalarValue, Statistics,
};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_physical_expr::expressions::Column;
Expand Down Expand Up @@ -466,7 +465,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
}

impl FileScanConfig {
fn projection_indices(&self) -> Vec<usize> {
pub(crate) fn projection_indices(&self) -> Vec<usize> {
match &self.projection {
Some(proj) => proj.clone(),
None => (0..self.file_schema.fields().len()
Expand All @@ -475,29 +474,6 @@ impl FileScanConfig {
}
}

/// make sure to call file_source.file_source_statistics()
pub fn projected_stats(&self, file_source_statistics: Statistics) -> Statistics {
let table_cols_stats = self
.projection_indices()
.into_iter()
.map(|idx| {
if idx < self.file_schema.fields().len() {
file_source_statistics.column_statistics[idx].clone()
} else {
// TODO provide accurate stat for partition column (#1186)
ColumnStatistics::new_unknown()
}
})
.collect();

Statistics {
num_rows: file_source_statistics.num_rows,
// TODO correct byte size: https://github.com/apache/datafusion/issues/14936
total_byte_size: file_source_statistics.total_byte_size,
column_statistics: table_cols_stats,
}
}

pub fn projected_schema(&self) -> Arc<Schema> {
let table_fields: Vec<_> = self
.projection_indices()
Expand Down Expand Up @@ -535,30 +511,6 @@ impl FileScanConfig {
pub fn newlines_in_values(&self) -> bool {
self.new_lines_in_values
}

/// Project the schema, constraints, and the statistics on the given column indices
pub fn project(
&self,
file_source_statistics: Statistics,
) -> (SchemaRef, Constraints, Statistics, Vec<LexOrdering>) {
if self.projection.is_none() && self.table_partition_cols.is_empty() {
return (
Arc::clone(&self.file_schema),
self.constraints.clone(),
file_source_statistics,
self.output_ordering.clone(),
);
}

let schema = self.projected_schema();
let constraints = self.projected_constraints();
let stats = self.projected_stats(file_source_statistics);

let output_ordering = get_projected_output_ordering(self, &schema);

(schema, constraints, stats, output_ordering)
}

pub fn projected_file_column_names(&self) -> Option<Vec<String>> {
self.projection.as_ref().map(|p| {
p.iter()
Expand Down Expand Up @@ -1198,7 +1150,7 @@ mod tests {
use crate::source::DataSource;
use arrow::array::{Int32Array, RecordBatch};
use datafusion_common::stats::Precision;
use datafusion_common::{assert_batches_eq, internal_err};
use datafusion_common::{assert_batches_eq, internal_err, ColumnStatistics};
use datafusion_expr::SortExpr;
use datafusion_physical_expr::create_physical_sort_expr;

Expand All @@ -1220,8 +1172,7 @@ mod tests {
)]),
);

let (proj_schema, _, proj_statistics, _) =
source.config().project(source.file_source_statistics());
let (proj_schema, _, proj_statistics, _) = source.project();
assert_eq!(proj_schema.fields().len(), file_schema.fields().len() + 1);
assert_eq!(
proj_schema.field(file_schema.fields().len()).name(),
Expand Down Expand Up @@ -1294,8 +1245,7 @@ mod tests {
)]),
);

let (proj_schema, _, proj_statistics, _) =
source.config().project(source.file_source_statistics());
let (proj_schema, _, proj_statistics, _) = source.project();
assert_eq!(
columns(&proj_schema),
vec!["date".to_owned(), "c1".to_owned()]
Expand Down Expand Up @@ -1356,7 +1306,7 @@ mod tests {
);

let source_statistics = source.file_source_statistics();
let conf_stats = source.data_source_statistics().unwrap();
let conf_stats = source.data_source_statistics();

// projection should be reflected in the file source statistics
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
Expand Down
6 changes: 3 additions & 3 deletions datafusion/datasource/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ impl DataSource for MemorySourceConfig {
SchedulingType::Cooperative
}

fn data_source_statistics(&self) -> Result<Statistics> {
Ok(common::compute_record_batch_statistics(
fn data_source_statistics(&self) -> Statistics {
common::compute_record_batch_statistics(
&self.partitions,
&self.schema,
self.projection.clone(),
))
)
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub trait DataSource: Send + Sync + Debug {
fn scheduling_type(&self) -> SchedulingType {
SchedulingType::NonCooperative
}
fn data_source_statistics(&self) -> Result<Statistics>;
fn data_source_statistics(&self) -> Statistics;
/// Return a copy of this DataSource with a new fetch limit
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
fn fetch(&self) -> Option<usize>;
Expand Down Expand Up @@ -305,7 +305,7 @@ impl ExecutionPlan for DataSourceExec {
}
Ok(statistics)
} else {
Ok(self.data_source.data_source_statistics()?)
Ok(self.data_source.data_source_statistics())
}
}

Expand Down