-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Support reverse parquet scan and fast parquet order inversion at row group level #18817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
3c23790 to
e8588b1
Compare
|
run benchmarks |
(BTW I am testing out some new scripts: #18115 (comment)) @zhuqi-lucas / @xudong963 let me know if you would like to be added to the whitelist |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
run benchmark clickbench_partitioned |
|
🤖 |
|
🤖: Benchmark completed Details
|
Amazing @alamb , pretty cool! Appreciate to be added, thanks! |
|
I'll review another round today |
xudong963
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the PR as the initial version is very close
| /// - **Latency**: First batch available after reading complete first (reversed) row group | ||
| /// - **Throughput**: Near-native speed with ~5-10% overhead for reversal operations | ||
| /// - **Memory**: O(row_group_size), not O(file_size) | ||
| /// TODO should we support max cache size to limit memory usage further? But if we exceed the cache size we can't reverse properly, so we need to fall back to normal reading? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please open a issue to track and descrip the todo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure @xudong963 , created the follow-up issue:
#19048
| /// ## Performance Characteristics | ||
| /// | ||
| /// - **Latency**: First batch available after reading complete first (reversed) row group | ||
| /// - **Throughput**: Near-native speed with ~5-10% overhead for reversal operations |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did you get the "~5-10% overhead"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is from my estimation, i can add create a benchmark for this operation as follow-up.
| /// | ||
| /// - **Latency**: First batch available after reading complete first (reversed) row group | ||
| /// - **Throughput**: Near-native speed with ~5-10% overhead for reversal operations | ||
| /// - **Memory**: O(row_group_size), not O(file_size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about the line, what does it mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It means, the memory max size is max row_group memory size, not the file_size.
| /// Requested: [number ASC] | ||
| /// Reversed: [number ASC, letter DESC] ✓ Prefix match! | ||
| /// ``` | ||
| fn is_reverse_ordering( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we could leverage the ordering equivalence to do the check not by string match.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed this in latest PR, thanks!
|
|
||
| // Check if current row group is complete | ||
| if this.current_rg_rows_read >= this.current_rg_total_rows { | ||
| this.finalize_current_row_group(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When finalize_current_row_group() encounters an error (e.g., during reverse_batch), it sets self.pending_error and returns early. However, it is called inside the loop in poll_next. Here does NOT check if pending_error was set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed this in latest PR, thanks!
| return Ok(batch); | ||
| } | ||
|
|
||
| let indices = UInt32Array::from_iter_values((0..num_rows as u32).rev()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The UInt32Array is allocated for every batch, not sure if it could be a potential cost, if it is, a cache may be good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point @xudong963 , i added the cache in latest PR.
Thank you @xudong963 for review, i will try to address it. And we also need benchmark for this PR to compare the performance with original dynamic topk. |
|
Please don't hold this up for me (I'm on vacation until Thursday) but I would love to review this, it's very exciting work 🚀 |
Thank you @adriangb , i am working on the benchmark first. |
|
@alamb @xudong963 @2010YOUY01 @adriangb |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very cool work!
My main concern is https://github.com/apache/datafusion/pull/18817/files#r2582092389.
My intuition is that this would be end in a better result if we split it into smaller pieces of work:
- Establish the high level API for sort pushdown and the optimizer rule. Only re-arrange files and return
Inexact. - Implement scan reversal for Parquet.
- Refactor ownership of ordering information from
FileScanConfigintoPartitionedFileandFileGroup. - Any followups to reduce or cap memory use, etc.
|
|
||
| /// Enable sort pushdown optimization for sorted Parquet files. | ||
| /// Currently, this optimization only has reverse order support. | ||
| /// When a query requires ordering that can be satisfied by reversing | ||
| /// the file's natural ordering, row groups and batches are read in | ||
| /// reverse order to eliminate sort operations. | ||
| /// Note: This buffers one row group at a time (typically ~128MB). | ||
| /// Default: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this support non-reverse cases where the query order is the same as the file order? i.e. can we eliminate sorts in that simpler case as well? Or does that already happen / was already implemented?
| pub fn with_reverse_scan(mut self, reverse_scan: bool) -> Self { | ||
| self.reverse_scan = reverse_scan; | ||
| self | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these used from production code (the optimizers, etc.) or only from our tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The similar code already in our production for very long time, and it works well.
| // Note: We ignore the specific `order` parameter here because the decision | ||
| // about whether we can reverse is made at the FileScanConfig level. | ||
| // This method creates a reversed version of the current ParquetSource, | ||
| // and the FileScanConfig will reverse both the file list and the declared ordering. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify: since FileScanConfig is called first we are assuming that if we were called the intention is to reverse the order? I would rather verify this / decouple these two things. That is I'd rather parse the sort order here and do whatever we need to do internally to satisfy it (or respond None if we can't) instead of relying on FileScanConfig to do it for us.
| // - batch_size | ||
| // - table_parquet_options | ||
| // - all other settings | ||
| let new_source = self.clone().with_reverse_scan(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is the only place with_reverse_scan is used I suggest we set the private field directly to avoid polluting the public API.
| let schema = Arc::new(Schema::empty()); | ||
| let source = ParquetSource::new(schema); | ||
|
|
||
| assert!(!source.reverse_scan()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could just access the private field here since it's in the same module.
| fn try_pushdown_sort( | ||
| &self, | ||
| _order: &[PhysicalSortExpr], | ||
| ) -> Result<Option<Arc<dyn FileSource>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it might be nice to make this an enum instead of an option. Specifically:
pub enum SortOrderPushdownResult<T> {
Exact { inner: T },
Inexact { inner: T },
Unsupported,
}The idea being that Inexact means "I changed myself to better satisfy the join order, but cannot guarantee it's perfectly sorted (e.g. only re-arranged files and row groups based on stats but not the actual data stream).
This matters because e.g. Inexact would already be a huge win for TopK + dynamic filter pushdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great idea!
| &self, | ||
| order: &[PhysicalSortExpr], | ||
| ) -> Result<Option<Arc<dyn DataSource>>> { | ||
| let current_ordering = match self.output_ordering.first() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit for a bigger picture plan: I think we should re-arrange things such that the ordering of each file is recorded in PartitionedFile and any known ordering of groups in FileGroup. Then FileScanConfig should calculate it's output ordering from that instead of being given one. And if sort pushdown is requested then FileScanConfig can:
- Try to re-arrange the order by reversing groups, by re-creating groups entirely using stats. This is enough for
Inexactin https://github.com/apache/datafusion/pull/18817/files#r2582092389. - Try to push down the preferred order to the
FileSourcewhich determines if the order of reading a given file can be reversed. Maybe it needs a reference to the files since it has to handle row group order i.e. Parquet specific stuff? But if it is able to reverse the order of the scans then the whole result becomesExact.
| fn try_pushdown_sort( | ||
| &self, | ||
| _order: &[PhysicalSortExpr], | ||
| ) -> Result<Option<Arc<dyn ExecutionPlan>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I similarly think there should be the option to return a new plan that is optimized for the order but doesn't satisfy it enough to remove the sort (https://github.com/apache/datafusion/pull/18817/files#r2582092389)
Thank you @adriangb for review, great idea, i will try to address this today. |
|
cc @2010YOUY01 @adriangb @alamb @xudong963 Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench clickbench --iterations 5 --path /Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data --sorted-by EventTime --sort-order ASC -o /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_19059/data_sorted_clickbench.json`
Running benchmarks with the following options: RunOpt { query: None, pushdown: false, common: CommonOpt { iterations: 5, partitions: None, batch_size: None, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, path: "/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet", queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/issue_19059/data_sorted_clickbench.json"), sorted_by: Some("EventTime"), sort_order: "ASC" }
⚠️ Forcing target_partitions=1 to preserve sort order
⚠️ (Because we want to get the pure performance benefit of sorted data to compare)
📊 Session config target_partitions: 1
Registering table with sort order: EventTime ASC
Executing: CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_0_sorted.parquet' WITH ORDER ("EventTime" ASC)
Q0: -- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
-- set datafusion.execution.parquet.binary_as_string = true
SELECT * FROM hits ORDER BY "EventTime" DESC limit 10;
Query 0 iteration 0 took 21.5 ms and returned 10 rows
Query 0 iteration 1 took 12.7 ms and returned 10 rows
Query 0 iteration 2 took 11.1 ms and returned 10 rows
Query 0 iteration 3 took 10.6 ms and returned 10 rows
Query 0 iteration 4 took 9.9 ms and returned 10 rows
Query 0 avg time: 13.17 ms
+ set +x
DoneSo it's very close to reverse parquet implementation: The main branch result is: The reverse parquet implementation this PR is: The reverse row group with dynamic topk is #19064: |
|
I plan to try and re-review this in detail tomorrow |
|
Which issue does this PR close?
Closes #17172
Overview
This PR implements reverse scanning for Parquet files to optimize
ORDER BY ... DESC LIMIT Nqueries on sorted data. When DataFusion detects that reversing the scan order would eliminate the need for a separate sort operation, it can now directly read Parquet files in reverse order.Implementation Note: This PR implements Part 1 of the vision outlined in #17172 (Order Inversion at the DataFusion level).
Current implementation:
Future improvements (requires arrow-rs changes):
takekernel overhead)These enhancements would further optimize memory usage and latency, but the current implementation already provides substantial benefits for most workloads.
Rationale for this change
Motivation
Currently, queries like
SELECT * FROM table ORDER BY sorted_column DESC LIMIT 100require DataFusion to:For files that are already sorted in ascending order, this is inefficient. With this optimization, DataFusion can:
Performance Benefits:
Scope and Limitations
This optimization applies to:
SortPreservingMergeis still requiredORDER BY ... DESCon pre-sorted columnsLIMITclauses (most beneficial for single-partition)This optimization does NOT apply to:
Single-partition vs Multi-partition:
SortPreservingMergeExecis needed to combine streams. Limit cannot be pushed to individual partitions.Performance comparison:
ORDER BY DESC LIMIT N→ Direct reverse scan with limit pushed down to DataSourceORDER BY DESC LIMIT N→ Reverse scan per partition +LocalLimitExec+SortPreservingMergeExecWhile multi-partition scans still require a merge operation, they benefit significantly from:
LocalLimitExecConfiguration
This optimization is enabled by default but can be controlled via:
SQL:
Rust API:
When to disable:
Default: Enabled (true)
Implementation Details
Architecture
The implementation consists of four main components:
1. ParquetSource API (
source.rs)reverse_scan: boolfield toParquetSourcewith_reverse_scan()andreverse_scan()methods2. ParquetOpener (
opener.rs)reverse_scan: boolfieldrow_group_indexes.reverse()reverse_scanflag, creates either:RecordBatchStreamAdapterReversedParquetStreamwith row-group-level buffering3. ReversedParquetStream (
opener.rs)A custom stream implementation that performs two-stage reversal with optional limit support:
Stage 1 - Row Reversal: Reverse rows within each batch using Arrow's
takekernelStage 2 - Batch Reversal: Reverse the order of batches within each row group
Key Properties:
row_groups_reversed,batches_reversed, andreverse_time4. Physical Optimizer (
reverse_order.rs)ReverseOrderoptimization ruleSortExecwith reversible input orderingGlobalLimitExec -> SortExecpatterns (most beneficial case)TreeNodeRewriterto push reverse flag down toParquetSourceDataSourceExecto avoid correctness issues with multi-partition scansWhy Row-Group-Level Buffering?
Row group buffering is necessary for correctness:
This is the minimal buffering granularity that ensures correct results while still being compatible with arrow-rs's existing parquet reader architecture.
Memory Characteristics:
Why this is necessary:
Future Optimization: Page-level reverse scanning in arrow-rs could further reduce memory usage and improve latency by eliminating row-group buffering entirely.
What changes are included in this PR?
Core Implementation:
ParquetSource: Added reverse scan flag and methodsParquetOpener: Row group reversal and stream creation logicReversedParquetStream: Unified stream implementation with optional limit supportPhysical Optimization:
ReverseOrder: New optimizer rule for detecting and applying reverse scan optimizationSortExecandGlobalLimitExec -> SortExecConfiguration:
enable_reverse_scanconfig option (default: true)Metrics:
row_groups_reversed: Count of reversed row groupsbatches_reversed: Count of reversed batchesreverse_time: Time spent reversing dataAre these changes tested?
Yes, comprehensive tests added:
Unit Tests (
opener.rs):Integration Tests (
reverse_order.rs):SQL Logic Tests (
.sltfiles):Are there any user-facing changes?
New Configuration Option:
datafusion.execution.parquet.enable_reverse_scan(default: true)Behavioral Changes:
ORDER BY ... DESC LIMIT Non sorted single-partition Parquet files will automatically use reverse scanning when beneficialBreaking Changes: