Skip to content

Commit 3c21b54

Browse files
committed
move projection handling into FileSource
1 parent 76b9e12 commit 3c21b54

File tree

58 files changed

+1625
-1090
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1625
-1090
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion-examples/examples/custom_data_source/csv_json_opener.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,22 +64,22 @@ async fn csv_opener() -> Result<()> {
6464
..Default::default()
6565
};
6666

67-
let scan_config = FileScanConfigBuilder::new(
68-
ObjectStoreUrl::local_filesystem(),
69-
Arc::new(CsvSource::new(Arc::clone(&schema)).with_csv_options(options.clone())),
70-
)
71-
.with_projection_indices(Some(vec![12, 0]))
72-
.with_limit(Some(5))
73-
.with_file(PartitionedFile::new(path.display().to_string(), 10))
74-
.build();
75-
76-
let config = CsvSource::new(Arc::clone(&schema))
67+
let source = CsvSource::new(Arc::clone(&schema))
7768
.with_csv_options(options)
7869
.with_comment(Some(b'#'))
79-
.with_batch_size(8192)
80-
.with_projection(&scan_config);
70+
.with_batch_size(8192);
71+
72+
let scan_config =
73+
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
74+
.with_projection_indices(Some(vec![12, 0]))?
75+
.with_limit(Some(5))
76+
.with_file(PartitionedFile::new(path.display().to_string(), 10))
77+
.build();
8178

82-
let opener = config.create_file_opener(object_store, &scan_config, 0);
79+
let opener =
80+
scan_config
81+
.file_source()
82+
.create_file_opener(object_store, &scan_config, 0)?;
8383

8484
let mut result = vec![];
8585
let mut stream =
@@ -133,7 +133,7 @@ async fn json_opener() -> Result<()> {
133133
ObjectStoreUrl::local_filesystem(),
134134
Arc::new(JsonSource::new(schema)),
135135
)
136-
.with_projection_indices(Some(vec![1, 0]))
136+
.with_projection_indices(Some(vec![1, 0]))?
137137
.with_limit(Some(5))
138138
.with_file(PartitionedFile::new(path.to_string(), 10))
139139
.build();

datafusion-examples/examples/custom_data_source/default_column_values.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ impl TableProvider for DefaultValueTableProvider {
258258
ObjectStoreUrl::parse("memory://")?,
259259
Arc::new(parquet_source),
260260
)
261-
.with_projection_indices(projection.cloned())
261+
.with_projection_indices(projection.cloned())?
262262
.with_limit(limit)
263263
.with_file_group(file_group)
264264
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));

datafusion-examples/examples/data_io/parquet_advanced_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ impl TableProvider for IndexTableProvider {
502502
);
503503
let file_scan_config = FileScanConfigBuilder::new(object_store_url, file_source)
504504
.with_limit(limit)
505-
.with_projection_indices(projection.cloned())
505+
.with_projection_indices(projection.cloned())?
506506
.with_file(partitioned_file)
507507
.build();
508508

datafusion-examples/examples/data_io/parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ impl TableProvider for IndexTableProvider {
247247
Arc::new(ParquetSource::new(self.schema()).with_predicate(predicate));
248248
let mut file_scan_config_builder =
249249
FileScanConfigBuilder::new(object_store_url, source)
250-
.with_projection_indices(projection.cloned())
250+
.with_projection_indices(projection.cloned())?
251251
.with_limit(limit);
252252

253253
// Transform to the format needed to pass to DataSourceExec

datafusion/catalog-listing/src/table.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -504,7 +504,7 @@ impl TableProvider for ListingTable {
504504
.with_file_groups(partitioned_file_lists)
505505
.with_constraints(self.constraints.clone())
506506
.with_statistics(statistics)
507-
.with_projection_indices(projection)
507+
.with_projection_indices(projection)?
508508
.with_limit(limit)
509509
.with_output_ordering(output_ordering)
510510
.with_expr_adapter(self.expr_adapter_factory.clone())

datafusion/core/src/dataframe/parquet.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ mod tests {
150150
let plan = df.explain(false, false)?.collect().await?;
151151
// Filters all the way to Parquet
152152
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
153-
assert!(formatted.contains("FilterExec: id@0 = 1"));
153+
assert!(formatted.contains("FilterExec: id@0 = 1"), "{formatted}");
154154

155155
Ok(())
156156
}

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub(crate) mod test_util {
9292
)
9393
.with_file_groups(file_groups)
9494
.with_statistics(statistics)
95-
.with_projection_indices(projection)
95+
.with_projection_indices(projection)?
9696
.with_limit(limit)
9797
.build(),
9898
)

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ mod tests {
8484
let source = Arc::new(AvroSource::new(Arc::clone(&file_schema)));
8585
let conf = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
8686
.with_file(meta.into())
87-
.with_projection_indices(Some(vec![0, 1, 2]))
87+
.with_projection_indices(Some(vec![0, 1, 2]))?
8888
.build();
8989

9090
let source_exec = DataSourceExec::from_data_source(conf);
@@ -156,7 +156,7 @@ mod tests {
156156
let source = Arc::new(AvroSource::new(Arc::clone(&file_schema)));
157157
let conf = FileScanConfigBuilder::new(object_store_url, source)
158158
.with_file(meta.into())
159-
.with_projection_indices(projection)
159+
.with_projection_indices(projection)?
160160
.build();
161161

162162
let source_exec = DataSourceExec::from_data_source(conf);
@@ -231,7 +231,7 @@ mod tests {
231231
let conf = FileScanConfigBuilder::new(object_store_url, source)
232232
// select specific columns of the files as well as the partitioning
233233
// column which is supposed to be the last column in the table schema.
234-
.with_projection_indices(projection)
234+
.with_projection_indices(projection)?
235235
.with_file(partitioned_file)
236236
.build();
237237

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,10 @@ mod tests {
126126
let source =
127127
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
128128
let config =
129-
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
129+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
130130
.with_file_compression_type(file_compression_type)
131131
.with_newlines_in_values(false)
132-
.with_projection_indices(Some(vec![0, 2, 4]))
132+
.with_projection_indices(Some(vec![0, 2, 4]))?
133133
.build();
134134

135135
assert_eq!(13, config.file_schema().fields().len());
@@ -199,10 +199,10 @@ mod tests {
199199
let source =
200200
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
201201
let config =
202-
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
202+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
203203
.with_newlines_in_values(false)
204204
.with_file_compression_type(file_compression_type.to_owned())
205-
.with_projection_indices(Some(vec![4, 0, 2]))
205+
.with_projection_indices(Some(vec![4, 0, 2]))?
206206
.build();
207207
assert_eq!(13, config.file_schema().fields().len());
208208
let csv = DataSourceExec::from_data_source(config);
@@ -271,7 +271,7 @@ mod tests {
271271
let source =
272272
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
273273
let config =
274-
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
274+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
275275
.with_newlines_in_values(false)
276276
.with_file_compression_type(file_compression_type.to_owned())
277277
.with_limit(Some(5))
@@ -342,7 +342,7 @@ mod tests {
342342
let source =
343343
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
344344
let config =
345-
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
345+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
346346
.with_newlines_in_values(false)
347347
.with_file_compression_type(file_compression_type.to_owned())
348348
.with_limit(Some(5))
@@ -411,12 +411,12 @@ mod tests {
411411
let source =
412412
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
413413
let config =
414-
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
414+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
415415
.with_newlines_in_values(false)
416416
.with_file_compression_type(file_compression_type.to_owned())
417417
// We should be able to project on the partition column
418418
// Which is supposed to be after the file fields
419-
.with_projection_indices(Some(vec![0, num_file_schema_fields]))
419+
.with_projection_indices(Some(vec![0, num_file_schema_fields]))?
420420
.build();
421421

422422
// we don't have `/date=xx/` in the path but that is ok because
@@ -517,7 +517,7 @@ mod tests {
517517
let source =
518518
Arc::new(CsvSource::new(table_schema.clone()).with_csv_options(options));
519519
let config =
520-
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source))
520+
FileScanConfigBuilder::from(partitioned_csv_config(file_groups, source)?)
521521
.with_newlines_in_values(false)
522522
.with_file_compression_type(file_compression_type.to_owned())
523523
.build();

0 commit comments

Comments
 (0)