Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c3647cd
Test for sorted data
zhuqi-lucas Dec 1, 2025
627f081
draft implementation for sorted data benchmark
zhuqi-lucas Dec 2, 2025
6c0afd6
fix
zhuqi-lucas Dec 2, 2025
0ee89f5
fix
zhuqi-lucas Dec 2, 2025
acf6e24
mege
zhuqi-lucas Dec 2, 2025
c654246
fix
zhuqi-lucas Dec 2, 2025
68e72f1
fix
zhuqi-lucas Dec 2, 2025
401cb8e
fix
zhuqi-lucas Dec 2, 2025
88f84d9
better
zhuqi-lucas Dec 2, 2025
2392655
Update benchmarks/sort_clickbench.py
zhuqi-lucas Dec 2, 2025
413142a
Update benchmarks/sort_clickbench.py
zhuqi-lucas Dec 2, 2025
ecbe8d0
Update benchmarks/sort_clickbench.py
zhuqi-lucas Dec 2, 2025
bc11193
Update benchmarks/sort_clickbench.py
zhuqi-lucas Dec 2, 2025
fce2ccc
Update benchmarks/sort_clickbench.py
zhuqi-lucas Dec 2, 2025
c62c0fa
Address new comments
zhuqi-lucas Dec 2, 2025
2dcbee2
Update benchmarks/src/clickbench.rs
zhuqi-lucas Dec 4, 2025
ba45f6a
Update benchmarks/src/clickbench.rs
zhuqi-lucas Dec 4, 2025
c547cd3
Address comments
zhuqi-lucas Dec 4, 2025
39e6a5c
Merge remote-tracking branch 'upstream/main' into issue_18976
zhuqi-lucas Dec 4, 2025
140d4ea
Update benchmarks/src/clickbench.rs
zhuqi-lucas Dec 4, 2025
5702ed6
Update benchmarks/README.md
zhuqi-lucas Dec 5, 2025
4ddcbdf
Address new comments.
zhuqi-lucas Dec 5, 2025
022547e
fix
zhuqi-lucas Dec 5, 2025
ceac596
fix
zhuqi-lucas Dec 5, 2025
e4826dc
Merge branch 'main' into issue_18976
zhuqi-lucas Dec 5, 2025
5d6c28e
Merge branch 'main' into issue_18976
alamb Dec 5, 2025
8f2a4e0
Address comments.
zhuqi-lucas Dec 6, 2025
e1a434d
Merge branch 'main' into issue_18976
zhuqi-lucas Dec 6, 2025
d247fd4
Add duration time
zhuqi-lucas Dec 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -787,3 +787,41 @@ Getting results...
cancelling thread
done dropping runtime in 83.531417ms
```

## Sorted Data Benchmarks

### Data Sorted ClickBench

Benchmark for queries on pre-sorted data to test sort order optimization.
This benchmark uses a subset of the ClickBench dataset (hits.parquet, ~14GB) that has been pre-sorted by the EventTime column. The queries are designed to test DataFusion's performance when the data is already sorted as is common in timeseries workloads.

The benchmark includes queries that:
- Scan pre-sorted data with ORDER BY clauses that match the sort order
- Test reverse scans on sorted data
- Verify the performance result

#### Generating Sorted Data

The sorted dataset is automatically generated from the ClickBench partitioned dataset. You can configure the memory used during the sorting process with the `DATAFUSION_MEMORY_GB` environment variable. The default memory limit is 12GB.
```bash
./bench.sh data data_sorted_clickbench
```

To create the sorted dataset, for example with 16GB of memory, run:

```bash
DATAFUSION_MEMORY_GB=16 ./bench.sh data data_sorted_clickbench
```

This command will:
1. Download the ClickBench partitioned dataset if not present
2. Sort hits.parquet by EventTime in ascending order
3. Save the sorted file as hits_sorted.parquet

#### Running the Benchmark

```bash
./bench.sh run data_sorted_clickbench
```

This runs queries against the pre-sorted dataset with the `--sorted-by EventTime` flag, which informs DataFusion that the data is pre-sorted, allowing it to optimize away redundant sort operations.
118 changes: 117 additions & 1 deletion benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ clickbench_partitioned: ClickBench queries against partitioned (100 files) parqu
clickbench_pushdown: ClickBench queries against partitioned (100 files) parquet w/ filter_pushdown enabled
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)

# Sorted Data Benchmarks (ORDER BY Optimization)
clickbench_sorted: ClickBench queries on pre-sorted data using prefer_existing_sort (tests sort elimination optimization)

# H2O.ai Benchmarks (Group By, Join, Window)
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
Expand Down Expand Up @@ -318,6 +321,9 @@ main() {
compile_profile)
data_tpch "1" "parquet"
;;
clickbench_sorted)
clickbench_sorted
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
usage
Expand Down Expand Up @@ -449,7 +455,7 @@ main() {
h2o_medium_window)
run_h2o_window "MEDIUM" "CSV" "window"
;;
h2o_big_window)
h2o_big_window)
run_h2o_window "BIG" "CSV" "window"
;;
h2o_small_parquet)
Expand Down Expand Up @@ -501,6 +507,9 @@ main() {
compile_profile)
run_compile_profile "${PROFILE_ARGS[@]}"
;;
clickbench_sorted)
run_clickbench_sorted
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for run"
usage
Expand Down Expand Up @@ -1201,6 +1210,113 @@ compare_benchmarks() {

}

# Creates sorted ClickBench data from hits.parquet (full dataset)
# The data is sorted by EventTime in ascending order
# Uses datafusion-cli to reduce dependencies
clickbench_sorted() {
SORTED_FILE="${DATA_DIR}/hits_sorted.parquet"
ORIGINAL_FILE="${DATA_DIR}/hits.parquet"

# Default memory limit is 12GB, can be overridden with DATAFUSION_MEMORY_GB env var
MEMORY_LIMIT_GB=${DATAFUSION_MEMORY_GB:-12}

echo "Creating sorted ClickBench dataset from hits.parquet..."
echo "Configuration:"
echo " Memory limit: ${MEMORY_LIMIT_GB}G"
echo " Row group size: 64K rows"
echo " Compression: uncompressed"

if [ ! -f "${ORIGINAL_FILE}" ]; then
echo "hits.parquet not found. Running data_clickbench_1 first..."
data_clickbench_1
fi

if [ -f "${SORTED_FILE}" ]; then
echo "Sorted hits.parquet already exists at ${SORTED_FILE}"
return 0
fi

echo "Sorting hits.parquet by EventTime (this may take several minutes)..."

pushd "${DATAFUSION_DIR}" > /dev/null
echo "Building datafusion-cli..."
cargo build --release --bin datafusion-cli
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

DATAFUSION_CLI="${DATAFUSION_DIR}/target/release/datafusion-cli"
popd > /dev/null


START_TIME=$(date +%s)
echo "Start time: $(date '+%Y-%m-%d %H:%M:%S')"
echo "Using datafusion-cli to create sorted parquet file..."
"${DATAFUSION_CLI}" << EOF
-- Memory and performance configuration
SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G';
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already set memory limit here @alamb , i think it's similar to -m limit.

SET datafusion.execution.spill_compression = 'uncompressed';
SET datafusion.execution.sort_spill_reservation_bytes = 10485760; -- 10MB
SET datafusion.execution.batch_size = 8192;
SET datafusion.execution.target_partitions = 1;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I tried locally for target_partitions, it only not OOM for setting to 1, even for 2 it will OOM, so i setting 1 here. I am not sure why.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it works for the huge data set:

   Running `/Users/zhuqi/arrow-datafusion/target/release/dfbench clickbench --iterations 5 --path /Users/zhuqi/arrow-datafusion/benchmarks/data/hits_sorted.parquet --queries-path /Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data --sorted-by EventTime -c datafusion.optimizer.prefer_existing_sort=true -o /Users/zhuqi/arrow-datafusion/benchmarks/results/issue_18976/data_sorted_clickbench.json`
Running benchmarks with the following options: RunOpt { query: None, pushdown: false, common: CommonOpt { iterations: 5, partitions: None, batch_size: None, mem_pool_type: "fair", memory_limit: None, sort_spill_reservation_bytes: None, debug: false }, path: "/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_sorted.parquet", queries_path: "/Users/zhuqi/arrow-datafusion/benchmarks/queries/clickbench/queries/sorted_data", output_path: Some("/Users/zhuqi/arrow-datafusion/benchmarks/results/issue_18976/data_sorted_clickbench.json"), sorted_by: Some("EventTime"), sort_order: "ASC", config_options: ["datafusion.optimizer.prefer_existing_sort=true"] }
ℹ️  Data is registered with sort order
Setting config: datafusion.optimizer.prefer_existing_sort = true
Registering table with sort order: EventTime ASC
Executing: CREATE EXTERNAL TABLE hits STORED AS PARQUET LOCATION '/Users/zhuqi/arrow-datafusion/benchmarks/data/hits_sorted.parquet' WITH ORDER ("EventTime" ASC)
Q0: -- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
-- set datafusion.execution.parquet.binary_as_string = true
SELECT * FROM hits ORDER BY "EventTime" DESC limit 10;

Query 0 iteration 0 took 2388.0 ms and returned 10 rows
Query 0 iteration 1 took 1789.9 ms and returned 10 rows
Query 0 iteration 2 took 1844.1 ms and returned 10 rows
Query 0 iteration 3 took 1816.4 ms and returned 10 rows
Query 0 iteration 4 took 1808.9 ms and returned 10 rows
Query 0 avg time: 1929.46 ms
+ set +x
Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I tried locally for target_partitions, it only not OOM for setting to 1, even for 2 it will OOM, so i setting 1 here. I am not sure why.

I think by default there is no memory limit.

andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ datafusion-cli --help
Command Line Client for DataFusion query engine.
...
  -m, --memory-limit <MEMORY_LIMIT>
          The memory pool limitation (e.g. '10g'), default to None (no limit)

You can potentially limit the memory usage like this

diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh
index e79cca1a2a..d227fffde6 100755
--- a/benchmarks/bench.sh
+++ b/benchmarks/bench.sh
@@ -1244,8 +1244,8 @@ data_sorted_clickbench() {
     DATAFUSION_CLI="${DATAFUSION_DIR}/target/release/datafusion-cli"
     popd > /dev/null

-    echo "Using datafusion-cli to create sorted parquet file..."
-    "${DATAFUSION_CLI}" << EOF
+    echo "Using datafusion-cli (4GB memory) to create sorted parquet file..."
+    "${DATAFUSION_CLI}"  -m 4 << EOF
 -- Memory and performance configuration
 SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G';
 SET datafusion.execution.spill_compression = 'uncompressed';

However, whenI tried that it still wasn't able to re-sort the data 🤔

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already set the memory limit here:
SET datafusion.runtime.memory_limit = '${MEMORY_LIMIT_GB}G';

I think we can keep target partition 1 for the first step, i can investigate more as follow-up, may be we can speed up it.

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resources exhausted: Additional allocation failed for ExternalSorterMerge[1] with top memory consumers (across reservations) as:
  ExternalSorter[2]#13(can spill: true) consumed 3.7 GB, peak 4.8 GB,
  ExternalSorter[3]#15(can spill: true) consumed 3.5 GB, peak 4.4 GB,
  ExternalSorterMerge[2]#14(can spill: false) consumed 2.3 GB, peak 2.3 GB,
  ExternalSorterMerge[1]#12(can spill: false) consumed 1004.2 MB, peak 1694.0 MB,
  ExternalSorterMerge[3]#16(can spill: false) consumed 845.7 MB, peak 1798.9 MB.
Error: Failed to allocate additional 12.7 MB for ExternalSorterMerge[1] with 998.9 MB already allocated for this reservation - 689.7 KB remain available for the total pool
\q

ExternalSorterMerge seems to cause the Resources exhausted, when we have more than one partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, @alamb I add the duration logs in latest PR now for the default behavior (12GB memory, and 1 target partition), the time is fast for it for my local mac, less than 5mins:

+----------+
| count    |
+----------+
| 99997497 |
+----------+
1 row(s) fetched.
Elapsed 278.468 seconds.

\q
End time: 2025-12-06 16:27:54Successfully created sorted ClickBench dataset
  Input:  14095 MB
  Output: 36159 MB

Time Statistics:
  Total duration: 280 seconds (00:04:40)
  Throughput: 50 MB/s


-- Parquet output configuration
SET datafusion.execution.parquet.max_row_group_size = 65536;
SET datafusion.execution.parquet.compression = 'uncompressed';

-- Execute sort and write
COPY (SELECT * FROM '${ORIGINAL_FILE}' ORDER BY "EventTime")
TO '${SORTED_FILE}'
STORED AS PARQUET;
EOF

local result=$?

END_TIME=$(date +%s)
DURATION=$((END_TIME - START_TIME))
echo "End time: $(date '+%Y-%m-%d %H:%M:%S')"

if [ $result -eq 0 ]; then
echo "✓ Successfully created sorted ClickBench dataset"

INPUT_SIZE=$(stat -f%z "${ORIGINAL_FILE}" 2>/dev/null || stat -c%s "${ORIGINAL_FILE}" 2>/dev/null)
OUTPUT_SIZE=$(stat -f%z "${SORTED_FILE}" 2>/dev/null || stat -c%s "${SORTED_FILE}" 2>/dev/null)
INPUT_MB=$((INPUT_SIZE / 1024 / 1024))
OUTPUT_MB=$((OUTPUT_SIZE / 1024 / 1024))

echo " Input: ${INPUT_MB} MB"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this and it showed

✓ Successfully created sorted ClickBench dataset
  Input:  14095 MB
  Output: 36159 MB

I think that is due to the lack of compression

Copy link
Contributor Author

@zhuqi-lucas zhuqi-lucas Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes @alamb , it was suggested by @2010YOUY01 here: #19042 (comment)

Because we want to speed up the sort for data generation, and we don't care about the compression here, so i set to uncompressed to speed up it.

SET datafusion.execution.parquet.compression = 'uncompressed';

echo " Output: ${OUTPUT_MB} MB"

echo ""
echo "Time Statistics:"
echo " Total duration: ${DURATION} seconds ($(printf '%02d:%02d:%02d' $((DURATION/3600)) $((DURATION%3600/60)) $((DURATION%60))))"
echo " Throughput: $((INPUT_MB / DURATION)) MB/s"

return 0
else
echo "✗ Error: Failed to create sorted dataset"
echo "💡 Tip: Try increasing memory with: DATAFUSION_MEMORY_GB=16 ./bench.sh data clickbench_sorted"
return 1
fi
}

# Runs the sorted data benchmark with prefer_existing_sort configuration
run_clickbench_sorted() {
RESULTS_FILE="${RESULTS_DIR}/clickbench_sorted.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sorted data benchmark with prefer_existing_sort optimization..."

# Ensure sorted data exists
clickbench_sorted

# Run benchmark with prefer_existing_sort configuration
# This allows DataFusion to optimize away redundant sorts while maintaining parallelism
debug_run $CARGO_COMMAND --bin dfbench -- clickbench \
--iterations 5 \
--path "${DATA_DIR}/hits_sorted.parquet" \
--queries-path "${SCRIPT_DIR}/queries/clickbench/queries/sorted_data" \
--sorted-by "EventTime" \
-c datafusion.optimizer.prefer_existing_sort=true \
-o "${RESULTS_FILE}" \
${QUERY_ARG}
}

setup_venv() {
python3 -m venv "$VIRTUAL_ENV"
PATH=$VIRTUAL_ENV/bin:$PATH python3 -m pip install -r requirements.txt
Expand Down
3 changes: 3 additions & 0 deletions benchmarks/queries/clickbench/queries/sorted_data/q0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591
-- set datafusion.execution.parquet.binary_as_string = true
SELECT * FROM hits ORDER BY "EventTime" DESC limit 10;
115 changes: 106 additions & 9 deletions benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ pub struct RunOpt {
/// If present, write results json here
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,

/// Column name that the data is sorted by (e.g., "EventTime")
/// If specified, DataFusion will be informed that the data has this sort order
/// using CREATE EXTERNAL TABLE with WITH ORDER clause.
///
/// Recommended to use with: -c datafusion.optimizer.prefer_existing_sort=true
/// This allows DataFusion to optimize away redundant sorts while maintaining
/// multi-core parallelism for other operations.
#[structopt(long = "sorted-by")]
sorted_by: Option<String>,

/// Sort order: ASC or DESC (default: ASC)
#[structopt(long = "sort-order", default_value = "ASC")]
sort_order: String,

/// Configuration options in the format key=value
/// Can be specified multiple times.
///
/// Example: -c datafusion.optimizer.prefer_existing_sort=true
#[structopt(short = "c", long = "config")]
config_options: Vec<String>,
}

/// Get the SQL file path
Expand Down Expand Up @@ -125,6 +146,37 @@ impl RunOpt {

// configure parquet options
let mut config = self.common.config()?;

if self.sorted_by.is_some() {
println!("ℹ️ Data is registered with sort order");

let has_prefer_sort = self
.config_options
.iter()
.any(|opt| opt.contains("prefer_existing_sort=true"));

if !has_prefer_sort {
println!("ℹ️ Consider using -c datafusion.optimizer.prefer_existing_sort=true");
println!("ℹ️ to optimize queries while maintaining parallelism");
}
}

// Apply user-provided configuration options
for config_opt in &self.config_options {
let parts: Vec<&str> = config_opt.splitn(2, '=').collect();
if parts.len() != 2 {
return Err(exec_datafusion_err!(
"Invalid config option format: '{}'. Expected 'key=value'",
config_opt
));
}
let key = parts[0];
let value = parts[1];

println!("Setting config: {key} = {value}");
config = config.set_str(key, value);
}

{
let parquet_options = &mut config.options_mut().execution.parquet;
// The hits_partitioned dataset specifies string columns
Expand All @@ -136,10 +188,18 @@ impl RunOpt {
parquet_options.pushdown_filters = true;
parquet_options.reorder_filters = true;
}

if self.sorted_by.is_some() {
// We should compare the dynamic topk optimization when data is sorted, so we make the
// assumption that filter pushdown is also enabled in this case.
parquet_options.pushdown_filters = true;
parquet_options.reorder_filters = true;
}
}

let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);

self.register_hits(&ctx).await?;

let mut benchmark_run = BenchmarkRun::new();
Expand Down Expand Up @@ -214,17 +274,54 @@ impl RunOpt {
}

/// Registers the `hits.parquet` as a table named `hits`
/// If sorted_by is specified, uses CREATE EXTERNAL TABLE with WITH ORDER
async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
let options = Default::default();
let path = self.path.as_os_str().to_str().unwrap();
ctx.register_parquet("hits", path, options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'hits' as {path}"),
Box::new(e),
)
})

// If sorted_by is specified, use CREATE EXTERNAL TABLE with WITH ORDER
if let Some(ref sort_column) = self.sorted_by {
println!(
"Registering table with sort order: {} {}",
sort_column, self.sort_order
);

// Escape column name with double quotes
let escaped_column = if sort_column.contains('"') {
sort_column.clone()
} else {
format!("\"{sort_column}\"")
};

// Build CREATE EXTERNAL TABLE DDL with WITH ORDER clause
// Schema will be automatically inferred from the Parquet file
let create_table_sql = format!(
"CREATE EXTERNAL TABLE hits \
STORED AS PARQUET \
LOCATION '{}' \
WITH ORDER ({} {})",
path,
escaped_column,
self.sort_order.to_uppercase()
);

println!("Executing: {create_table_sql}");

// Execute the CREATE EXTERNAL TABLE statement
ctx.sql(&create_table_sql).await?.collect().await?;

Ok(())
} else {
// Original registration without sort order
let options = Default::default();
ctx.register_parquet("hits", path, options)
.await
.map_err(|e| {
DataFusionError::Context(
format!("Registering 'hits' as {path}"),
Box::new(e),
)
})
}
}

fn iterations(&self) -> usize {
Expand Down