Skip to content

Commit fc538fb

Browse files
committed
move statistics handling into FileScanConfig
1 parent 0cfce35 commit fc538fb

File tree

12 files changed

+46
-173
lines changed

12 files changed

+46
-173
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ impl FileOpener for TestOpener {
105105
pub struct TestSource {
106106
support: bool,
107107
predicate: Option<Arc<dyn PhysicalExpr>>,
108-
statistics: Option<Statistics>,
109108
batch_size: Option<usize>,
110109
batches: Vec<RecordBatch>,
111110
schema: SchemaRef,
@@ -125,7 +124,6 @@ impl TestSource {
125124
metrics: ExecutionPlanMetricsSet::new(),
126125
batches,
127126
predicate: None,
128-
statistics: None,
129127
batch_size: None,
130128
projection: None,
131129
schema_adapter_factory: None,
@@ -165,25 +163,10 @@ impl FileSource for TestSource {
165163
})
166164
}
167165

168-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
169-
Arc::new(TestSource {
170-
statistics: Some(statistics),
171-
..self.clone()
172-
})
173-
}
174-
175166
fn metrics(&self) -> &ExecutionPlanMetricsSet {
176167
&self.metrics
177168
}
178169

179-
fn statistics(&self) -> Result<Statistics> {
180-
Ok(self
181-
.statistics
182-
.as_ref()
183-
.expect("statistics not set")
184-
.clone())
185-
}
186-
187170
fn file_type(&self) -> &str {
188171
"test"
189172
}

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,7 @@ pub(crate) fn parquet_exec_with_stats(file_size: u64) -> Arc<DataSourceExec> {
131131
.with_statistics(statistics)
132132
.build();
133133

134-
assert_eq!(
135-
config.file_source.statistics().unwrap().num_rows,
136-
Precision::Inexact(10000)
137-
);
134+
assert_eq!(config.statistics().num_rows, Precision::Inexact(10000));
138135
DataSourceExec::from_data_source(config)
139136
}
140137

datafusion/datasource-arrow/src/source.rs

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_datasource::{as_file_source, TableSchema};
4040
use arrow::buffer::Buffer;
4141
use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader};
4242
use datafusion_common::error::Result;
43-
use datafusion_common::{exec_datafusion_err, Statistics};
43+
use datafusion_common::exec_datafusion_err;
4444
use datafusion_datasource::file::FileSource;
4545
use datafusion_datasource::file_scan_config::FileScanConfig;
4646
use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
@@ -246,14 +246,12 @@ impl FileOpener for ArrowFileOpener {
246246
}
247247
}
248248

249-
250249
/// `FileSource` for both Arrow IPC file and stream formats
251250
#[derive(Clone)]
252251
pub struct ArrowSource {
253252
format: ArrowFormat,
254253
table_schema: TableSchema,
255254
metrics: ExecutionPlanMetricsSet,
256-
projected_statistics: Option<Statistics>,
257255
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
258256
projection: SplitProjection,
259257
}
@@ -265,7 +263,6 @@ impl ArrowSource {
265263
Self {
266264
format: ArrowFormat::File,
267265
metrics: ExecutionPlanMetricsSet::new(),
268-
projected_statistics: None,
269266
schema_adapter_factory: None,
270267
projection: SplitProjection::unprojected(&table_schema),
271268
table_schema,
@@ -278,7 +275,6 @@ impl ArrowSource {
278275
Self {
279276
format: ArrowFormat::Stream,
280277
metrics: ExecutionPlanMetricsSet::new(),
281-
projected_statistics: None,
282278
schema_adapter_factory: None,
283279
projection: SplitProjection::unprojected(&table_schema),
284280
table_schema,
@@ -315,24 +311,10 @@ impl FileSource for ArrowSource {
315311
Arc::new(Self { ..self.clone() })
316312
}
317313

318-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
319-
Arc::new(Self {
320-
projected_statistics: Some(statistics),
321-
..self.clone()
322-
})
323-
}
324-
325314
fn metrics(&self) -> &ExecutionPlanMetricsSet {
326315
&self.metrics
327316
}
328317

329-
fn statistics(&self) -> Result<Statistics> {
330-
Ok(self
331-
.projected_statistics
332-
.clone()
333-
.expect("projected_statistics must be set"))
334-
}
335-
336318
fn file_type(&self) -> &str {
337319
match self.format {
338320
ArrowFormat::File => "arrow",
@@ -378,7 +360,9 @@ impl FileSource for ArrowSource {
378360
// Use the default trait implementation logic for file format
379361
use datafusion_datasource::file_groups::FileGroupPartitioner;
380362

381-
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
363+
if config.file_compression_type.is_compressed()
364+
|| config.new_lines_in_values
365+
{
382366
return Ok(None);
383367
}
384368

@@ -388,7 +372,8 @@ impl FileSource for ArrowSource {
388372
.with_preserve_order_within_groups(output_ordering.is_some())
389373
.repartition_file_groups(&config.file_groups);
390374

391-
if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
375+
if let Some(repartitioned_file_groups) = repartitioned_file_groups_option
376+
{
392377
let mut source = config.clone();
393378
source.file_groups = repartitioned_file_groups;
394379
return Ok(Some(source));

datafusion/datasource-avro/src/source.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use std::sync::Arc;
2323
use crate::avro_to_arrow::Reader as AvroReader;
2424

2525
use datafusion_common::error::Result;
26-
use datafusion_common::Statistics;
2726
use datafusion_datasource::file::FileSource;
2827
use datafusion_datasource::file_scan_config::FileScanConfig;
2928
use datafusion_datasource::file_stream::FileOpener;
@@ -43,7 +42,6 @@ pub struct AvroSource {
4342
batch_size: Option<usize>,
4443
projection: SplitProjection,
4544
metrics: ExecutionPlanMetricsSet,
46-
projected_statistics: Option<Statistics>,
4745
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
4846
}
4947

@@ -56,7 +54,6 @@ impl AvroSource {
5654
table_schema,
5755
batch_size: None,
5856
metrics: ExecutionPlanMetricsSet::new(),
59-
projected_statistics: None,
6057
schema_adapter_factory: None,
6158
}
6259
}
@@ -108,12 +105,6 @@ impl FileSource for AvroSource {
108105
Arc::new(conf)
109106
}
110107

111-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
112-
let mut conf = self.clone();
113-
conf.projected_statistics = Some(statistics);
114-
Arc::new(conf)
115-
}
116-
117108
fn try_pushdown_projection(
118109
&self,
119110
projection: &ProjectionExprs,
@@ -134,13 +125,6 @@ impl FileSource for AvroSource {
134125
&self.metrics
135126
}
136127

137-
fn statistics(&self) -> Result<Statistics> {
138-
let statistics = &self.projected_statistics;
139-
Ok(statistics
140-
.clone()
141-
.expect("projected_statistics must be set"))
142-
}
143-
144128
fn file_type(&self) -> &str {
145129
"avro"
146130
}

datafusion/datasource-csv/src/source.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion_datasource::{
3636

3737
use arrow::csv;
3838
use datafusion_common::config::CsvOptions;
39-
use datafusion_common::{DataFusionError, Result, Statistics};
39+
use datafusion_common::{DataFusionError, Result};
4040
use datafusion_common_runtime::JoinSet;
4141
use datafusion_datasource::file::FileSource;
4242
use datafusion_datasource::file_scan_config::FileScanConfig;
@@ -92,7 +92,6 @@ pub struct CsvSource {
9292
table_schema: TableSchema,
9393
projection: SplitProjection,
9494
metrics: ExecutionPlanMetricsSet,
95-
projected_statistics: Option<Statistics>,
9695
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
9796
}
9897

@@ -106,7 +105,6 @@ impl CsvSource {
106105
table_schema,
107106
batch_size: None,
108107
metrics: ExecutionPlanMetricsSet::new(),
109-
projected_statistics: None,
110108
schema_adapter_factory: None,
111109
}
112110
}
@@ -269,12 +267,6 @@ impl FileSource for CsvSource {
269267
Arc::new(conf)
270268
}
271269

272-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
273-
let mut conf = self.clone();
274-
conf.projected_statistics = Some(statistics);
275-
Arc::new(conf)
276-
}
277-
278270
fn try_pushdown_projection(
279271
&self,
280272
projection: &ProjectionExprs,
@@ -294,12 +286,6 @@ impl FileSource for CsvSource {
294286
fn metrics(&self) -> &ExecutionPlanMetricsSet {
295287
&self.metrics
296288
}
297-
fn statistics(&self) -> Result<Statistics> {
298-
let statistics = &self.projected_statistics;
299-
Ok(statistics
300-
.clone()
301-
.expect("projected_statistics must be set"))
302-
}
303289
fn file_type(&self) -> &str {
304290
"csv"
305291
}

datafusion/datasource-json/src/source.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3939

4040
use arrow::json::ReaderBuilder;
4141
use arrow::{datatypes::SchemaRef, json};
42-
use datafusion_common::Statistics;
4342
use datafusion_datasource::file::FileSource;
4443
use datafusion_datasource::file_scan_config::FileScanConfig;
4544
use datafusion_execution::TaskContext;
@@ -81,7 +80,6 @@ pub struct JsonSource {
8180
table_schema: datafusion_datasource::TableSchema,
8281
batch_size: Option<usize>,
8382
metrics: ExecutionPlanMetricsSet,
84-
projected_statistics: Option<Statistics>,
8583
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
8684
projection: SplitProjection,
8785
}
@@ -95,7 +93,6 @@ impl JsonSource {
9593
table_schema,
9694
batch_size: None,
9795
metrics: ExecutionPlanMetricsSet::new(),
98-
projected_statistics: None,
9996
schema_adapter_factory: None,
10097
}
10198
}
@@ -148,12 +145,6 @@ impl FileSource for JsonSource {
148145
Arc::new(conf)
149146
}
150147

151-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
152-
let mut conf = self.clone();
153-
conf.projected_statistics = Some(statistics);
154-
Arc::new(conf)
155-
}
156-
157148
fn try_pushdown_projection(
158149
&self,
159150
projection: &ProjectionExprs,
@@ -174,13 +165,6 @@ impl FileSource for JsonSource {
174165
&self.metrics
175166
}
176167

177-
fn statistics(&self) -> Result<Statistics> {
178-
let statistics = &self.projected_statistics;
179-
Ok(statistics
180-
.clone()
181-
.expect("projected_statistics must be set to call"))
182-
}
183-
184168
fn file_type(&self) -> &str {
185169
"json"
186170
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use datafusion_datasource::schema_adapter::{
3838

3939
use arrow::datatypes::TimeUnit;
4040
use datafusion_common::config::TableParquetOptions;
41-
use datafusion_common::{DataFusionError, Statistics};
41+
use datafusion_common::DataFusionError;
4242
use datafusion_datasource::file::FileSource;
4343
use datafusion_datasource::file_scan_config::FileScanConfig;
4444
use datafusion_datasource::TableSchema;
@@ -288,7 +288,6 @@ pub struct ParquetSource {
288288
pub(crate) batch_size: Option<usize>,
289289
/// Optional hint for the size of the parquet metadata
290290
pub(crate) metadata_size_hint: Option<usize>,
291-
pub(crate) projected_statistics: Option<Statistics>,
292291
/// Projection information for column pushdown
293292
pub(crate) projection: SplitProjection,
294293
#[cfg(feature = "parquet_encryption")]
@@ -313,7 +312,6 @@ impl ParquetSource {
313312
schema_adapter_factory: None,
314313
batch_size: None,
315314
metadata_size_hint: None,
316-
projected_statistics: None,
317315
#[cfg(feature = "parquet_encryption")]
318316
encryption_factory: None,
319317
}
@@ -630,12 +628,6 @@ impl FileSource for ParquetSource {
630628
Arc::new(conf)
631629
}
632630

633-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
634-
let mut conf = self.clone();
635-
conf.projected_statistics = Some(statistics);
636-
Arc::new(conf)
637-
}
638-
639631
fn try_pushdown_projection(
640632
&self,
641633
projection: &ProjectionExprs,
@@ -656,23 +648,6 @@ impl FileSource for ParquetSource {
656648
&self.metrics
657649
}
658650

659-
fn statistics(&self) -> datafusion_common::Result<Statistics> {
660-
let statistics = &self.projected_statistics;
661-
let statistics = statistics
662-
.clone()
663-
.expect("projected_statistics must be set");
664-
// When filters are pushed down, we have no way of knowing the exact statistics.
665-
// Note that pruning predicate is also a kind of filter pushdown.
666-
// (bloom filters use `pruning_predicate` too).
667-
// Because filter pushdown may happen dynamically as long as there is a predicate
668-
// if we have *any* predicate applied, we can't guarantee the statistics are exact.
669-
if self.filter().is_some() {
670-
Ok(statistics.to_inexact())
671-
} else {
672-
Ok(statistics)
673-
}
674-
}
675-
676651
fn file_type(&self) -> &str {
677652
"parquet"
678653
}

datafusion/datasource/src/file.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::file_scan_config::FileScanConfig;
2727
use crate::file_stream::FileOpener;
2828
use crate::schema_adapter::SchemaAdapterFactory;
2929
use datafusion_common::config::ConfigOptions;
30-
use datafusion_common::{not_impl_err, Result, Statistics};
30+
use datafusion_common::{not_impl_err, Result};
3131
use datafusion_physical_expr::projection::ProjectionExprs;
3232
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
3333
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
@@ -67,8 +67,6 @@ pub trait FileSource: Send + Sync {
6767
fn table_schema(&self) -> &crate::table_schema::TableSchema;
6868
/// Initialize new type with batch size configuration
6969
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
70-
/// Initialize new instance with projected statistics
71-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
7270
/// Returns the filter expression that will be applied during the file scan.
7371
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
7472
None
@@ -79,8 +77,6 @@ pub trait FileSource: Send + Sync {
7977
}
8078
/// Return execution plan metrics
8179
fn metrics(&self) -> &ExecutionPlanMetricsSet;
82-
/// Return projected statistics
83-
fn statistics(&self) -> Result<Statistics>;
8480
/// String representation of file source such as "csv", "json", "parquet"
8581
fn file_type(&self) -> &str;
8682
/// Format FileType specific information

0 commit comments

Comments
 (0)