-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Add sorted data benchmark. #19042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add sorted data benchmark. #19042
Conversation
Co-authored-by: Martin Grigorov <[email protected]>
Co-authored-by: Martin Grigorov <[email protected]>
Co-authored-by: Martin Grigorov <[email protected]>
Co-authored-by: Martin Grigorov <[email protected]>
Co-authored-by: Martin Grigorov <[email protected]>
|
Thank you @martin-g for review. |
2010YOUY01
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
I tested locally and it's working as expected. I have left several minor advices for cleanup.
benchmarks/src/clickbench.rs
Outdated
| let rt_builder = self.common.runtime_env_builder()?; | ||
| let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); | ||
|
|
||
| // Debug: print actual target_partitions being used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's for debug, should we remove it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, i will remove it.
benchmarks/sort_clickbench.py
Outdated
| from pathlib import Path | ||
|
|
||
| try: | ||
| import pyarrow.parquet as pq |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add the dependencies to venv like #10894
benchmarks/sort_clickbench.py
Outdated
|
|
||
|
|
||
| def main(): | ||
| parser = argparse.ArgumentParser( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like it's not used, and it's using defaults. Should we remove it?
benchmarks/sort_clickbench.py
Outdated
| parser.add_argument( | ||
| '--compression', | ||
| choices=['snappy', 'gzip', 'brotli', 'lz4', 'zstd', 'none'], | ||
| default='zstd', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to use none here, zstd is quite heavy weight, and significant time will be spent decompression, here I believe we want to focus on the sort part
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, i agree.
benchmarks/bench.sh
Outdated
| # Ensure virtual environment exists and has pyarrow | ||
| if [ ! -d "$VIRTUAL_ENV" ]; then | ||
| echo "Creating virtual environment at $VIRTUAL_ENV..." | ||
| python3 -m venv "$VIRTUAL_ENV" | ||
| fi | ||
|
|
||
| # Activate virtual environment | ||
| source "$VIRTUAL_ENV/bin/activate" | ||
|
|
||
| # Check and install pyarrow if needed | ||
| if ! python3 -c "import pyarrow" 2>/dev/null; then | ||
| echo "Installing pyarrow (this may take a minute)..." | ||
| pip install --quiet pyarrow | ||
| fi | ||
|
|
||
| # Use the standalone Python script to sort | ||
| python3 "${SCRIPT_DIR}"/sort_clickbench.py "${ORIGINAL_FILE}" "${SORTED_FILE}" | ||
| local result=$? | ||
|
|
||
| # Deactivate virtual environment | ||
| deactivate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe users are supposed to activate the venv externally, so we only have to add pyarrow to requirements.txt, and remove the dependency installation steps here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, i will do this!
|
and update the doc here 👉🏼 https://github.com/apache/datafusion/blob/main/benchmarks/README.md |
Co-authored-by: Yongting You <[email protected]>
Co-authored-by: Yongting You <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds a new benchmark for measuring DataFusion's performance on pre-sorted data. It introduces infrastructure to create sorted ClickBench datasets and run queries that can benefit from sort order information, demonstrating up to 30X performance improvements for queries on sorted data.
- Adds command-line options to specify sort column and order in the clickbench benchmark
- Provides a Python utility script to sort ClickBench parquet files by EventTime
- Integrates sorted data benchmark generation and execution into bench.sh workflow
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| benchmarks/src/clickbench.rs | Adds --sorted-by and --sort-order CLI options; modifies table registration to use CREATE EXTERNAL TABLE with WITH ORDER clause when sort information is provided |
| benchmarks/sort_clickbench.py | New Python script for sorting ClickBench parquet files by EventTime with configurable row group size and compression options |
| benchmarks/queries/clickbench/queries/sorted_data/q0.sql | New benchmark query that tests reverse order scan on sorted data (ORDER BY DESC with LIMIT) |
| benchmarks/bench.sh | Adds data_sorted_clickbench data generation function and run_data_sorted_clickbench benchmark execution function |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Thank you @2010YOUY01 for review, addressed all comments in latest PR! |
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @zhuqi-lucas @2010YOUY01 and @xudong963 -- this is really nice 👌
I tested it locally like
./bench.sh data data_sorted_clickbenchAnd then
./bench.sh run data_sorted_clickbenchIt seems to have worked great
⚠️ Overriding target_partitions=1 to preserve sort order
⚠️ (Because we want to get the pure performance benefit of sorted data to compare)
📊 Session config target_partitions: 1
Registering table with sort order: EventTime ASC
Executing: CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/Users/andrewlamb/Software/datafusion2/benchmarks/data/hits_0_sorted.parquet' WITH ORDER ("EventTime" ASC)
Q0: -- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
-- set datafusion.execution.parquet.binary_as_string = true
SELECT * FROM hits ORDER BY "EventTime" DESC limit 10;
Query 0 iteration 0 took 228.9 ms and returned 10 rows
Query 0 iteration 1 took 178.1 ms and returned 10 rows
Query 0 iteration 2 took 178.5 ms and returned 10 rows
Query 0 iteration 3 took 177.8 ms and returned 10 rows
Query 0 iteration 4 took 179.1 ms and returned 10 rows
Query 0 avg time: 188.49 ms
+ set +x
Done
i have a few suggestions but nothing that is necessary in my opinion.
benchmarks/sort_clickbench.py
Outdated
| print(f" Compression: {compression}") | ||
|
|
||
| # Write sorted table with optimized settings | ||
| pq.write_table( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rather than using pq, would it be possible to use datafusion-cli for this (to reduce the number of dependencoes)?
https://datafusion.apache.org/user-guide/sql/dml.html#copy
For example, something like
> COPY (SELECT * from 'hits.parquet' ORDER BY "EventTime") TO 'output.parquet' OPTIONS (MAX_ROW_GROUP_SIZE 64000);I think that is pretty equialent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea @alamb , addressed in latest PR, thanks!
benchmarks/bench.sh
Outdated
| SORTED_FILE="${DATA_DIR}/hits_0_sorted.parquet" | ||
| ORIGINAL_FILE="${DATA_DIR}/hits_partitioned/hits_0.parquet" | ||
|
|
||
| echo "Creating sorted ClickBench dataset from hits_0.parquet..." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing you could do is sort hits.parquet (the entire dataset) rather than just 1% of the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this, but OOM in my local mac, and i tried today with target partition setting to 1, it works now.
Addressed in latest PR, thanks @alamb !
benchmarks/src/clickbench.rs
Outdated
| // configure parquet options | ||
| let mut config = self.common.config()?; | ||
|
|
||
| // CRITICAL: If sorted_by is specified, force target_partitions=1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend using a different option, datafusion.optimizer.prefer_existing_sort which I think is more likely what real systems would be using as it has the same effect but still allows more than one core to be used by other parts of the query
https://datafusion.apache.org/user-guide/configs.html
| datafusion.optimizer.prefer_existing_sort | false | When true, DataFusion will opportunistically remove sorts when the data is already sorted, (i.e. setting preserve_order to true on RepartitionExec and using SortPreservingMergeExec) When false, DataFusion will maximize plan parallelism using RepartitionExec even if this requires subsequently resorting data using a SortExec. |
|---|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point @alamb , addressed in latest PR, thanks!
Co-authored-by: Andrew Lamb <[email protected]>
Thank you @alamb for review, addressed the comments in latest PR. |
| SET datafusion.execution.spill_compression = 'uncompressed'; | ||
| SET datafusion.execution.sort_spill_reservation_bytes = 10485760; -- 10MB | ||
| SET datafusion.execution.batch_size = 8192; | ||
| SET datafusion.execution.target_partitions = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I tried locally for target_partitions, it only not OOM for setting to 1, even for 2 it will OOM, so i setting 1 here. I am not sure why.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it works for the huge data set:
Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench clickbench --iterations 5 --path /Users/zhuqi/arrow-datafusion/benchmarks/data/hits_sorted.parquet --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data --sorted-by EventTime -c datafusion.optimizer.prefer_existing_sort=true -o /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_18976/data_sorted_clickbench.json`
Running benchmarks with the following options: RunOpt { query: None, pushdown: false, common: CommonOpt { iterations: 5, partitions: None, batch_size: None, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, path: "/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_sorted.parquet", queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/issue_18976/data_sorted_clickbench.json"), sorted_by: Some("EventTime"), sort_order: "ASC", config_options: ["datafusion.optimizer.prefer_existing_sort=true"] }
ℹ️ Data is registered with sort order
Setting config: datafusion.optimizer.prefer_existing_sort = true
Registering table with sort order: EventTime ASC
Executing: CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_sorted.parquet' WITH ORDER ("EventTime" ASC)
Q0: -- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
-- set datafusion.execution.parquet.binary_as_string = true
SELECT * FROM hits ORDER BY "EventTime" DESC limit 10;
Query 0 iteration 0 took 2388.0 ms and returned 10 rows
Query 0 iteration 1 took 1789.9 ms and returned 10 rows
Query 0 iteration 2 took 1844.1 ms and returned 10 rows
Query 0 iteration 3 took 1816.4 ms and returned 10 rows
Query 0 iteration 4 took 1808.9 ms and returned 10 rows
Query 0 avg time: 1929.46 ms
+ set +x
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I tried locally for target_partitions, it only not OOM for setting to 1, even for 2 it will OOM, so i setting 1 here. I am not sure why.
I think by default there is no memory limit.
andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ datafusion-cli --help
Command Line Client for DataFusion query engine.
...
-m, --memory-limit <MEMORY_LIMIT>
The memory pool limitation (e.g. '10g'), default to None (no limit)
You can potentially limit the memory usage like this
diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh
index e79cca1a2a..d227fffde6 100755
--- a/benchmarks/bench.sh
+++ b/benchmarks/bench.sh
@@ -1244,8 +1244,8 @@ data_sorted_clickbench() {
DATAFUSION_CLI="${DATAFUSION_DIR}/target/release/datafusion-cli"
popd > /dev/null
- echo "Using datafusion-cli to create sorted parquet file..."
- "${DATAFUSION_CLI}" << EOF
+ echo "Using datafusion-cli (4GB memory) to create sorted parquet file..."
+ "${DATAFUSION_CLI}" -m 4 << EOF
-- Memory and performance configuration
SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G';
SET datafusion.execution.spill_compression = 'uncompressed';However, whenI tried that it still wasn't able to re-sort the data 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already set the memory limit here:
SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G';
I think we can keep target partition 1 for the first step, i can investigate more as follow-up, may be we can speed up it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resources exhausted: Additional allocation failed for ExternalSorterMerge[1] with top memory consumers (across reservations) as:
ExternalSorter[2]#13(can spill: true) consumed 3.7 GB, peak 4.8 GB,
ExternalSorter[3]#15(can spill: true) consumed 3.5 GB, peak 4.4 GB,
ExternalSorterMerge[2]#14(can spill: false) consumed 2.3 GB, peak 2.3 GB,
ExternalSorterMerge[1]#12(can spill: false) consumed 1004.2 MB, peak 1694.0 MB,
ExternalSorterMerge[3]#16(can spill: false) consumed 845.7 MB, peak 1798.9 MB.
Error: Failed to allocate additional 12.7 MB for ExternalSorterMerge[1] with 998.9 MB already allocated for this reservation - 689.7 KB remain available for the total pool
\qExternalSorterMerge seems to cause the Resources exhausted, when we have more than one partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, @alamb I add the duration logs in latest PR now for the default behavior (12GB memory, and 1 target partition), the time is fast for it for my local mac, less than 5mins:
+----------+
| count |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 278.468 seconds.
\q
End time: 2025-12-06 16:27:54
✓ Successfully created sorted ClickBench dataset
Input: 14095 MB
Output: 36159 MB
Time Statistics:
Total duration: 280 seconds (00:04:40)
Throughput: 50 MB/s| SET datafusion.execution.spill_compression = 'uncompressed'; | ||
| SET datafusion.execution.sort_spill_reservation_bytes = 10485760; -- 10MB | ||
| SET datafusion.execution.batch_size = 8192; | ||
| SET datafusion.execution.target_partitions = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb I tried locally for target_partitions, it only not OOM for setting to 1, even for 2 it will OOM, so i setting 1 here. I am not sure why.
I think by default there is no memory limit.
andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ datafusion-cli --help
Command Line Client for DataFusion query engine.
...
-m, --memory-limit <MEMORY_LIMIT>
The memory pool limitation (e.g. '10g'), default to None (no limit)
You can potentially limit the memory usage like this
diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh
index e79cca1a2a..d227fffde6 100755
--- a/benchmarks/bench.sh
+++ b/benchmarks/bench.sh
@@ -1244,8 +1244,8 @@ data_sorted_clickbench() {
DATAFUSION_CLI="${DATAFUSION_DIR}/target/release/datafusion-cli"
popd > /dev/null
- echo "Using datafusion-cli to create sorted parquet file..."
- "${DATAFUSION_CLI}" << EOF
+ echo "Using datafusion-cli (4GB memory) to create sorted parquet file..."
+ "${DATAFUSION_CLI}" -m 4 << EOF
-- Memory and performance configuration
SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G';
SET datafusion.execution.spill_compression = 'uncompressed';However, whenI tried that it still wasn't able to re-sort the data 🤔
benchmarks/bench.sh
Outdated
| clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) | ||
| # Sorted Data Benchmarks (ORDER BY Optimization) | ||
| data_sorted_clickbench: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I realize it is a bit late, but maybe calling this clickbench_sorted would be more consistent with the benchmarks above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point @alamb , it's not late, i addressed in latest PR, thanks!
|
|
||
| pushd "${DATAFUSION_DIR}" > /dev/null | ||
| echo "Building datafusion-cli..." | ||
| cargo build --release --bin datafusion-cli |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice
| INPUT_MB=$((INPUT_SIZE / 1024 / 1024)) | ||
| OUTPUT_MB=$((OUTPUT_SIZE / 1024 / 1024)) | ||
|
|
||
| echo " Input: ${INPUT_MB} MB" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran this and it showed
✓ Successfully created sorted ClickBench dataset
Input: 14095 MB
Output: 36159 MBI think that is due to the lack of compression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes @alamb , it was suggested by @2010YOUY01 here: #19042 (comment)
Because we want to speed up the sort for data generation, and we don't care about the compression here, so i set to uncompressed to speed up it.
SET datafusion.execution.parquet.compression = 'uncompressed';| echo "Using datafusion-cli to create sorted parquet file..." | ||
| "${DATAFUSION_CLI}" << EOF | ||
| -- Memory and performance configuration | ||
| SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already set memory limit here @alamb , i think it's similar to -m limit.
Which issue does this PR close?
Add sorted data benchmark.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Yes, test results for reverse parquet PR, it's 30X faster than main branch for sorted data:
#18817
And the main branch result:
Are there any user-facing changes?