Skip to content

Commit b0d47cf

Browse files
committed
feat(optimize): configure DataFusion spill settings for optimize operations
Signed-off-by: Florian Valeye <[email protected]>
1 parent acd75d6 commit b0d47cf

File tree

5 files changed

+121
-30
lines changed

5 files changed

+121
-30
lines changed

crates/core/src/operations/optimize.rs

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use arrow_array::RecordBatch;
2929
use arrow_schema::SchemaRef as ArrowSchemaRef;
3030
use datafusion::catalog::Session;
3131
use datafusion::execution::context::SessionState;
32+
use datafusion::execution::disk_manager::DiskManagerBuilder;
3233
use datafusion::execution::memory_pool::FairSpillPool;
3334
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
3435
use datafusion::execution::SessionStateBuilder;
@@ -215,8 +216,6 @@ pub struct OptimizeBuilder<'a> {
215216
preserve_insertion_order: bool,
216217
/// Maximum number of concurrent tasks (default is number of cpus)
217218
max_concurrent_tasks: usize,
218-
/// Maximum number of bytes allowed in memory before spilling to disk
219-
max_spill_size: usize,
220219
/// Optimize type
221220
optimize_type: OptimizeType,
222221
/// Datafusion session state relevant for executing the input plan
@@ -234,6 +233,38 @@ impl super::Operation<()> for OptimizeBuilder<'_> {
234233
}
235234
}
236235

236+
/// Create a SessionState configured for optimize operations with custom spill settings.
237+
///
238+
/// This is the recommended way to configure memory and disk limits for optimize operations.
239+
/// The created SessionState should be passed to [`OptimizeBuilder`] via [`with_session_state`](OptimizeBuilder::with_session_state).
240+
///
241+
/// # Arguments
242+
/// * `max_spill_size` - Maximum bytes in memory before spilling to disk. If `None`, uses DataFusion's default memory pool.
243+
/// * `max_temp_directory_size` - Maximum disk space for temporary spill files. If `None`, uses DataFusion's default disk manager.
244+
pub fn create_session_state_for_optimize(
245+
max_spill_size: Option<usize>,
246+
max_temp_directory_size: Option<u64>,
247+
) -> SessionState {
248+
let mut runtime_builder = RuntimeEnvBuilder::new();
249+
250+
if let Some(spill_size) = max_spill_size {
251+
let memory_pool = FairSpillPool::new(spill_size);
252+
runtime_builder = runtime_builder.with_memory_pool(Arc::new(memory_pool));
253+
}
254+
255+
if let Some(directory_size) = max_temp_directory_size {
256+
let disk_manager_builder =
257+
DiskManagerBuilder::default().with_max_temp_directory_size(directory_size);
258+
runtime_builder = runtime_builder.with_disk_manager_builder(disk_manager_builder);
259+
}
260+
261+
let runtime = runtime_builder.build_arc().unwrap();
262+
SessionStateBuilder::new()
263+
.with_default_features()
264+
.with_runtime_env(runtime)
265+
.build()
266+
}
267+
237268
impl<'a> OptimizeBuilder<'a> {
238269
/// Create a new [`OptimizeBuilder`]
239270
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
@@ -246,7 +277,6 @@ impl<'a> OptimizeBuilder<'a> {
246277
commit_properties: CommitProperties::default(),
247278
preserve_insertion_order: false,
248279
max_concurrent_tasks: num_cpus::get(),
249-
max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB.
250280
optimize_type: OptimizeType::Compact,
251281
min_commit_interval: None,
252282
session: None,
@@ -296,16 +326,6 @@ impl<'a> OptimizeBuilder<'a> {
296326
self
297327
}
298328

299-
/// Max spill size
300-
#[deprecated(
301-
since = "0.29.0",
302-
note = "Pass in a `SessionState` configured with a `RuntimeEnv` and a `FairSpillPool`"
303-
)]
304-
pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self {
305-
self.max_spill_size = max_spill_size;
306-
self
307-
}
308-
309329
/// Min commit interval
310330
pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self {
311331
self.min_commit_interval = Some(min_commit_interval);
@@ -349,17 +369,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
349369
let session = this
350370
.session
351371
.and_then(|session| session.as_any().downcast_ref::<SessionState>().cloned())
352-
.unwrap_or_else(|| {
353-
let memory_pool = FairSpillPool::new(this.max_spill_size);
354-
let runtime = RuntimeEnvBuilder::new()
355-
.with_memory_pool(Arc::new(memory_pool))
356-
.build_arc()
357-
.unwrap();
358-
SessionStateBuilder::new()
359-
.with_default_features()
360-
.with_runtime_env(runtime)
361-
.build()
362-
});
372+
.unwrap_or_else(|| create_session_state_for_optimize(None, None));
363373
let plan = create_merge_plan(
364374
&this.log_store,
365375
this.optimize_type,

python/deltalake/_internal.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ class RawDeltaTable:
117117
partition_filters: PartitionFilterType | None,
118118
target_size: int | None,
119119
max_concurrent_tasks: int | None,
120+
max_spill_size: int | None,
121+
max_temp_directory_size: int | None,
120122
min_commit_interval: int | None,
121123
writer_properties: WriterProperties | None,
122124
commit_properties: CommitProperties | None,
@@ -129,6 +131,7 @@ class RawDeltaTable:
129131
target_size: int | None,
130132
max_concurrent_tasks: int | None,
131133
max_spill_size: int | None,
134+
max_temp_directory_size: int | None,
132135
min_commit_interval: int | None,
133136
writer_properties: WriterProperties | None,
134137
commit_properties: CommitProperties | None,

python/deltalake/table.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1934,6 +1934,8 @@ def compact(
19341934
partition_filters: FilterConjunctionType | None = None,
19351935
target_size: int | None = None,
19361936
max_concurrent_tasks: int | None = None,
1937+
max_spill_size: int | None = None,
1938+
max_temp_directory_size: int | None = None,
19371939
min_commit_interval: int | timedelta | None = None,
19381940
writer_properties: WriterProperties | None = None,
19391941
post_commithook_properties: PostCommitHookProperties | None = None,
@@ -1956,6 +1958,8 @@ def compact(
19561958
max_concurrent_tasks: the maximum number of concurrent tasks to use for
19571959
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
19581960
faster, but will also use more memory.
1961+
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. If not specified, uses DataFusion's default.
1962+
max_temp_directory_size: the maximum disk space for temporary spill files. If not specified, uses DataFusion's default.
19591963
min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
19601964
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
19611965
want a commit per partition.
@@ -1989,6 +1993,8 @@ def compact(
19891993
self.table._stringify_partition_values(partition_filters),
19901994
target_size,
19911995
max_concurrent_tasks,
1996+
max_spill_size,
1997+
max_temp_directory_size,
19921998
min_commit_interval,
19931999
writer_properties,
19942000
commit_properties,
@@ -2003,7 +2009,8 @@ def z_order(
20032009
partition_filters: FilterConjunctionType | None = None,
20042010
target_size: int | None = None,
20052011
max_concurrent_tasks: int | None = None,
2006-
max_spill_size: int = 20 * 1024 * 1024 * 1024,
2012+
max_spill_size: int | None = None,
2013+
max_temp_directory_size: int | None = None,
20072014
min_commit_interval: int | timedelta | None = None,
20082015
writer_properties: WriterProperties | None = None,
20092016
post_commithook_properties: PostCommitHookProperties | None = None,
@@ -2023,7 +2030,8 @@ def z_order(
20232030
max_concurrent_tasks: the maximum number of concurrent tasks to use for
20242031
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
20252032
faster, but will also use more memory.
2026-
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. Defaults to 20GB.
2033+
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. If not specified, uses DataFusion's default.
2034+
max_temp_directory_size: the maximum disk space for temporary spill files. If not specified, uses DataFusion's default.
20272035
min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
20282036
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
20292037
want a commit per partition.
@@ -2059,6 +2067,7 @@ def z_order(
20592067
target_size,
20602068
max_concurrent_tasks,
20612069
max_spill_size,
2070+
max_temp_directory_size,
20622071
min_commit_interval,
20632072
writer_properties,
20642073
commit_properties,

python/src/lib.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ use deltalake::operations::delete::DeleteBuilder;
4040
use deltalake::operations::drop_constraints::DropConstraintBuilder;
4141
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
4242
use deltalake::operations::load_cdf::CdfLoadBuilder;
43-
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
43+
use deltalake::operations::optimize::{
44+
create_session_state_for_optimize, OptimizeBuilder, OptimizeType,
45+
};
4446
use deltalake::operations::restore::RestoreBuilder;
4547
use deltalake::operations::set_tbl_properties::SetTablePropertiesBuilder;
4648
use deltalake::operations::update::UpdateBuilder;
@@ -592,6 +594,8 @@ impl RawDeltaTable {
592594
partition_filters = None,
593595
target_size = None,
594596
max_concurrent_tasks = None,
597+
max_spill_size = None,
598+
max_temp_directory_size = None,
595599
min_commit_interval = None,
596600
writer_properties=None,
597601
commit_properties=None,
@@ -604,6 +608,8 @@ impl RawDeltaTable {
604608
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
605609
target_size: Option<u64>,
606610
max_concurrent_tasks: Option<usize>,
611+
max_spill_size: Option<usize>,
612+
max_temp_directory_size: Option<u64>,
607613
min_commit_interval: Option<u64>,
608614
writer_properties: Option<PyWriterProperties>,
609615
commit_properties: Option<PyCommitProperties>,
@@ -612,6 +618,14 @@ impl RawDeltaTable {
612618
let (table, metrics) = py.allow_threads(|| {
613619
let mut cmd = OptimizeBuilder::new(self.log_store()?, self.cloned_state()?)
614620
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
621+
622+
// Configure session with custom spill settings if provided
623+
if max_spill_size.is_some() || max_temp_directory_size.is_some() {
624+
let session =
625+
create_session_state_for_optimize(max_spill_size, max_temp_directory_size);
626+
cmd = cmd.with_session_state(Arc::new(session));
627+
}
628+
615629
if let Some(size) = target_size {
616630
cmd = cmd.with_target_size(size);
617631
}
@@ -654,7 +668,8 @@ impl RawDeltaTable {
654668
partition_filters = None,
655669
target_size = None,
656670
max_concurrent_tasks = None,
657-
max_spill_size = 20 * 1024 * 1024 * 1024,
671+
max_spill_size = None,
672+
max_temp_directory_size = None,
658673
min_commit_interval = None,
659674
writer_properties=None,
660675
commit_properties=None,
@@ -666,7 +681,8 @@ impl RawDeltaTable {
666681
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
667682
target_size: Option<u64>,
668683
max_concurrent_tasks: Option<usize>,
669-
max_spill_size: usize,
684+
max_spill_size: Option<usize>,
685+
max_temp_directory_size: Option<u64>,
670686
min_commit_interval: Option<u64>,
671687
writer_properties: Option<PyWriterProperties>,
672688
commit_properties: Option<PyCommitProperties>,
@@ -675,8 +691,15 @@ impl RawDeltaTable {
675691
let (table, metrics) = py.allow_threads(|| {
676692
let mut cmd = OptimizeBuilder::new(self.log_store()?, self.cloned_state()?)
677693
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
678-
.with_max_spill_size(max_spill_size)
679694
.with_type(OptimizeType::ZOrder(z_order_columns));
695+
696+
// Configure session with custom spill settings if provided
697+
if max_spill_size.is_some() || max_temp_directory_size.is_some() {
698+
let session =
699+
create_session_state_for_optimize(max_spill_size, max_temp_directory_size);
700+
cmd = cmd.with_session_state(Arc::new(session));
701+
}
702+
680703
if let Some(size) = target_size {
681704
cmd = cmd.with_target_size(size);
682705
}

python/tests/test_optimize.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,3 +266,49 @@ def test_optimize_schema_evolved_3185(tmp_path):
266266
assert dt.version() == 2
267267
last_action = dt.history(1)[0]
268268
assert last_action["operation"] == "OPTIMIZE"
269+
270+
271+
def test_compact_with_spill_parameters(
272+
tmp_path: pathlib.Path,
273+
sample_table: Table,
274+
):
275+
write_deltalake(tmp_path, sample_table, mode="append")
276+
write_deltalake(tmp_path, sample_table, mode="append")
277+
write_deltalake(tmp_path, sample_table, mode="append")
278+
279+
dt = DeltaTable(tmp_path)
280+
old_version = dt.version()
281+
old_num_files = len(dt.file_uris())
282+
283+
dt.optimize.compact(
284+
max_spill_size=100 * 1024 * 1024 * 1024, # 100 GB
285+
max_temp_directory_size=500 * 1024 * 1024 * 1024, # 500 GB
286+
)
287+
288+
last_action = dt.history(1)[0]
289+
assert last_action["operation"] == "OPTIMIZE"
290+
assert dt.version() == old_version + 1
291+
assert len(dt.file_uris()) <= old_num_files
292+
293+
294+
def test_z_order_with_spill_parameters(
295+
tmp_path: pathlib.Path,
296+
sample_table: Table,
297+
):
298+
write_deltalake(tmp_path, sample_table, mode="append")
299+
write_deltalake(tmp_path, sample_table, mode="append")
300+
write_deltalake(tmp_path, sample_table, mode="append")
301+
302+
dt = DeltaTable(tmp_path)
303+
old_version = dt.version()
304+
305+
dt.optimize.z_order(
306+
columns=["sold", "price"],
307+
max_spill_size=100 * 1024 * 1024 * 1024, # 100 GB
308+
max_temp_directory_size=500 * 1024 * 1024 * 1024, # 500 GB
309+
)
310+
311+
last_action = dt.history(1)[0]
312+
assert last_action["operation"] == "OPTIMIZE"
313+
assert dt.version() == old_version + 1
314+
assert len(dt.file_uris()) == 1

0 commit comments

Comments
 (0)