Skip to content

Commit 5717991

Browse files
Copy limits before repartitions apache#20736/20752 (#402/#403)
Also makes topk module public for downstream access to TopKDynamicFilters.
1 parent d2fee74 commit 5717991

16 files changed

Lines changed: 809 additions & 28 deletions

File tree

Cargo.lock

Lines changed: 19 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,19 @@ ahash = { version = "0.8", default-features = false, features = [
9191
"runtime-rng",
9292
] }
9393
apache-avro = { version = "0.20", default-features = false }
94-
arrow = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "7d5c1c973", features = [
94+
arrow = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "3cc86c18912a603c36bcafd976e1c75a9071d7b7", features = [
9595
"prettyprint",
9696
"chrono-tz",
9797
] }
98-
arrow-buffer = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "7d5c1c973", default-features = false }
99-
arrow-flight = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "7d5c1c973", features = [
98+
arrow-buffer = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "3cc86c18912a603c36bcafd976e1c75a9071d7b7", default-features = false }
99+
arrow-flight = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "3cc86c18912a603c36bcafd976e1c75a9071d7b7", features = [
100100
"flight-sql-experimental",
101101
] }
102-
arrow-ipc = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "7d5c1c973", default-features = false, features = [
102+
arrow-ipc = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "3cc86c18912a603c36bcafd976e1c75a9071d7b7", default-features = false, features = [
103103
"lz4",
104104
] }
105-
arrow-ord = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "7d5c1c973", default-features = false }
106-
arrow-schema = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "7d5c1c973", default-features = false }
105+
arrow-ord = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "3cc86c18912a603c36bcafd976e1c75a9071d7b7", default-features = false }
106+
arrow-schema = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "3cc86c18912a603c36bcafd976e1c75a9071d7b7", default-features = false }
107107
async-trait = "0.1.89"
108108
bigdecimal = "0.4.8"
109109
bytes = "1.10"
@@ -162,7 +162,7 @@ log = "^0.4"
162162
num-traits = { version = "0.2" }
163163
object_store = { version = ">=0.12.4, <0.13", default-features = false }
164164
parking_lot = "0.12"
165-
parquet = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "7d5c1c973", default-features = false, features = [
165+
parquet = { git = "https://github.com/Coralogix/arrow-rs.git", rev = "3cc86c18912a603c36bcafd976e1c75a9071d7b7", default-features = false, features = [
166166
"arrow",
167167
"async",
168168
"object_store",

datafusion/common/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,11 @@ config_namespace! {
861861
/// into the file scan phase.
862862
pub enable_join_dynamic_filter_pushdown: bool, default = true
863863

864+
/// When set to true, the optimizer will push TopK (Sort with fetch)
865+
/// below hash repartition when the partition key is a prefix of the
866+
/// sort key, reducing data volume before the shuffle.
867+
pub enable_topk_repartition: bool, default = true
868+
864869
/// When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase.
865870
/// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
866871
/// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.

datafusion/core/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ name = "struct_query_sql"
227227
harness = false
228228
name = "window_query_sql"
229229

230+
[[bench]]
231+
harness = false
232+
name = "topk_repartition"
233+
230234
[[bench]]
231235
harness = false
232236
name = "scalar"
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Benchmark for the TopKRepartition optimizer rule.
19+
//!
20+
//! Measures the benefit of pushing TopK (Sort with fetch) below hash
21+
//! repartition when running partitioned window functions with LIMIT.
22+
23+
mod data_utils;
24+
25+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
26+
use data_utils::create_table_provider;
27+
use datafusion::prelude::{SessionConfig, SessionContext};
28+
use parking_lot::Mutex;
29+
use std::hint::black_box;
30+
use std::sync::Arc;
31+
use tokio::runtime::Runtime;
32+
33+
#[expect(clippy::needless_pass_by_value)]
34+
fn query(ctx: Arc<Mutex<SessionContext>>, rt: &Runtime, sql: &str) {
35+
let df = rt.block_on(ctx.lock().sql(sql)).unwrap();
36+
black_box(rt.block_on(df.collect()).unwrap());
37+
}
38+
39+
fn create_context(
40+
partitions_len: usize,
41+
target_partitions: usize,
42+
enable_topk_repartition: bool,
43+
) -> Arc<Mutex<SessionContext>> {
44+
let array_len = 1024 * 1024;
45+
let batch_size = 8 * 1024;
46+
let mut config = SessionConfig::new().with_target_partitions(target_partitions);
47+
config.options_mut().optimizer.enable_topk_repartition = enable_topk_repartition;
48+
let ctx = SessionContext::new_with_config(config);
49+
let rt = Runtime::new().unwrap();
50+
rt.block_on(async {
51+
let provider =
52+
create_table_provider(partitions_len, array_len, batch_size).unwrap();
53+
ctx.register_table("t", provider).unwrap();
54+
});
55+
Arc::new(Mutex::new(ctx))
56+
}
57+
58+
fn criterion_benchmark(c: &mut Criterion) {
59+
let rt = Runtime::new().unwrap();
60+
61+
let limits = [10, 1_000, 10_000, 100_000];
62+
let scans = 16;
63+
let target_partitions = 4;
64+
65+
let group = format!("topk_repartition_{scans}_to_{target_partitions}");
66+
let mut group = c.benchmark_group(group);
67+
for limit in limits {
68+
let sql = format!(
69+
"SELECT \
70+
SUM(f64) OVER (PARTITION BY u64_narrow ORDER BY u64_wide ROWS UNBOUNDED PRECEDING) \
71+
FROM t \
72+
ORDER BY u64_narrow, u64_wide \
73+
LIMIT {limit}"
74+
);
75+
76+
let ctx_disabled = create_context(scans, target_partitions, false);
77+
group.bench_function(BenchmarkId::new("disabled", limit), |b| {
78+
b.iter(|| query(ctx_disabled.clone(), &rt, &sql))
79+
});
80+
81+
let ctx_enabled = create_context(scans, target_partitions, true);
82+
group.bench_function(BenchmarkId::new("enabled", limit), |b| {
83+
b.iter(|| query(ctx_enabled.clone(), &rt, &sql))
84+
});
85+
}
86+
group.finish();
87+
}
88+
89+
criterion_group!(benches, criterion_benchmark);
90+
criterion_main!(benches);

datafusion/physical-optimizer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,5 +56,7 @@ recursive = { workspace = true, optional = true }
5656
[dev-dependencies]
5757
datafusion-expr = { workspace = true }
5858
datafusion-functions = { workspace = true }
59+
datafusion-functions-nested = { workspace = true }
60+
datafusion-functions-window = { workspace = true }
5961
insta = { workspace = true }
6062
tokio = { workspace = true }

datafusion/physical-optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ pub mod projection_pushdown;
4242
pub use datafusion_pruning as pruning;
4343
pub mod sanity_checker;
4444
pub mod topk_aggregation;
45+
pub mod topk_repartition;
4546
pub mod update_aggr_exprs;
4647
pub mod utils;
4748

0 commit comments

Comments
 (0)