Skip to content

Commit 5c83b44

Browse files
committed
feat(optimize): integrate disk manager configuration with DeltaSessionContext
Signed-off-by: Florian Valeye <[email protected]>
1 parent d0ae849 commit 5c83b44

File tree

8 files changed

+186
-42
lines changed

8 files changed

+186
-42
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,11 @@ use crate::table::state::DeltaTableState;
7070
use crate::table::{Constraint, GeneratedColumn};
7171
use crate::{open_table, open_table_with_storage_options, DeltaTable};
7272

73-
pub use self::session::*;
73+
pub(crate) use self::session::session_state_from_session;
74+
pub use self::session::{
75+
create_session, DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig,
76+
DeltaSessionContext,
77+
};
7478
pub(crate) use find_files::*;
7579

7680
pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";

crates/core/src/delta_datafusion/session.rs

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1+
use std::sync::Arc;
2+
13
use datafusion::{
24
catalog::Session,
35
common::{exec_datafusion_err, Result as DataFusionResult},
4-
execution::{SessionState, SessionStateBuilder},
6+
execution::{
7+
disk_manager::DiskManagerBuilder,
8+
memory_pool::FairSpillPool,
9+
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
10+
SessionState, SessionStateBuilder,
11+
},
512
prelude::{SessionConfig, SessionContext},
613
sql::planner::ParserOptions,
714
};
@@ -67,25 +74,78 @@ impl From<DeltaSessionConfig> for SessionConfig {
6774
}
6875
}
6976

70-
/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults
77+
/// A builder for configuring DataFusion RuntimeEnv with Delta-specific defaults
78+
#[derive(Default)]
79+
pub struct DeltaRuntimeEnvBuilder {
80+
inner: RuntimeEnvBuilder,
81+
}
82+
83+
impl DeltaRuntimeEnvBuilder {
84+
pub fn new() -> Self {
85+
Self {
86+
inner: RuntimeEnvBuilder::new(),
87+
}
88+
}
89+
90+
pub fn with_max_spill_size(mut self, size: usize) -> Self {
91+
let memory_pool = FairSpillPool::new(size);
92+
self.inner = self.inner.with_memory_pool(Arc::new(memory_pool));
93+
self
94+
}
95+
96+
pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
97+
let disk_manager = DiskManagerBuilder::default().with_max_temp_directory_size(size);
98+
self.inner = self.inner.with_disk_manager_builder(disk_manager);
99+
self
100+
}
101+
102+
pub fn build(self) -> Arc<RuntimeEnv> {
103+
self.inner.build_arc().unwrap()
104+
}
105+
}
106+
107+
/// A wrapper for DataFusion's SessionContext with Delta-specific defaults
108+
///
109+
/// This provides a way of creating DataFusion sessions with consistent
110+
/// Delta Lake configuration (case-sensitive identifiers, Delta planner, etc.)
71111
pub struct DeltaSessionContext {
72112
inner: SessionContext,
73113
}
74114

75115
impl DeltaSessionContext {
116+
/// Create a new DeltaSessionContext with default configuration
76117
pub fn new() -> Self {
77-
let ctx = SessionContext::new_with_config(DeltaSessionConfig::default().into());
118+
let config = DeltaSessionConfig::default().into();
119+
let runtime_env = RuntimeEnvBuilder::new().build_arc().unwrap();
120+
Self::new_with_config_and_runtime(config, runtime_env)
121+
}
122+
123+
/// Create a DeltaSessionContext with a custom RuntimeEnv
124+
pub fn with_runtime_env(runtime_env: Arc<RuntimeEnv>) -> Self {
125+
let config = DeltaSessionConfig::default().into();
126+
Self::new_with_config_and_runtime(config, runtime_env)
127+
}
128+
129+
fn new_with_config_and_runtime(config: SessionConfig, runtime_env: Arc<RuntimeEnv>) -> Self {
78130
let planner = DeltaPlanner::new();
79-
let state = SessionStateBuilder::new_from_existing(ctx.state())
131+
let state = SessionStateBuilder::new()
132+
.with_default_features()
133+
.with_config(config)
134+
.with_runtime_env(runtime_env)
80135
.with_query_planner(planner)
81136
.build();
137+
82138
let inner = SessionContext::new_with_state(state);
83139
Self { inner }
84140
}
85141

86142
pub fn into_inner(self) -> SessionContext {
87143
self.inner
88144
}
145+
146+
pub fn state(&self) -> SessionState {
147+
self.inner.state()
148+
}
89149
}
90150

91151
impl Default for DeltaSessionContext {

crates/core/src/operations/optimize.rs

Lines changed: 29 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,6 @@ use arrow::array::RecordBatch;
2929
use arrow::datatypes::SchemaRef;
3030
use datafusion::catalog::Session;
3131
use datafusion::execution::context::SessionState;
32-
use datafusion::execution::memory_pool::FairSpillPool;
33-
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
34-
use datafusion::execution::SessionStateBuilder;
3532
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
3633
use delta_kernel::expressions::Scalar;
3734
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
@@ -51,7 +48,7 @@ use uuid::Uuid;
5148

5249
use super::write::writer::{PartitionWriter, PartitionWriterConfig};
5350
use super::{CustomExecuteHandler, Operation};
54-
use crate::delta_datafusion::DeltaTableProvider;
51+
use crate::delta_datafusion::{DeltaRuntimeEnvBuilder, DeltaSessionContext, DeltaTableProvider};
5552
use crate::errors::{DeltaResult, DeltaTableError};
5653
use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL};
5754
use crate::kernel::EagerSnapshot;
@@ -215,8 +212,6 @@ pub struct OptimizeBuilder<'a> {
215212
preserve_insertion_order: bool,
216213
/// Maximum number of concurrent tasks (default is number of cpus)
217214
max_concurrent_tasks: usize,
218-
/// Maximum number of bytes allowed in memory before spilling to disk
219-
max_spill_size: usize,
220215
/// Optimize type
221216
optimize_type: OptimizeType,
222217
/// Datafusion session state relevant for executing the input plan
@@ -234,6 +229,33 @@ impl super::Operation<()> for OptimizeBuilder<'_> {
234229
}
235230
}
236231

232+
/// Create a SessionState configured for optimize operations with custom spill settings.
233+
///
234+
/// This is the recommended way to configure memory and disk limits for optimize operations.
235+
/// The created SessionState should be passed to [`OptimizeBuilder`] via [`with_session_state`](OptimizeBuilder::with_session_state).
236+
///
237+
/// # Arguments
238+
/// * `max_spill_size` - Maximum bytes in memory before spilling to disk. If `None`, uses DataFusion's default memory pool.
239+
/// * `max_temp_directory_size` - Maximum disk space for temporary spill files. If `None`, uses DataFusion's default disk manager.
240+
pub fn create_session_state_for_optimize(
241+
max_spill_size: Option<usize>,
242+
max_temp_directory_size: Option<u64>,
243+
) -> SessionState {
244+
if max_spill_size.is_none() && max_temp_directory_size.is_none() {
245+
return DeltaSessionContext::new().state();
246+
}
247+
248+
let mut builder = DeltaRuntimeEnvBuilder::new();
249+
if let Some(spill_size) = max_spill_size {
250+
builder = builder.with_max_spill_size(spill_size);
251+
}
252+
if let Some(directory_size) = max_temp_directory_size {
253+
builder = builder.with_max_temp_directory_size(directory_size);
254+
}
255+
256+
DeltaSessionContext::with_runtime_env(builder.build()).state()
257+
}
258+
237259
impl<'a> OptimizeBuilder<'a> {
238260
/// Create a new [`OptimizeBuilder`]
239261
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
@@ -246,7 +268,6 @@ impl<'a> OptimizeBuilder<'a> {
246268
commit_properties: CommitProperties::default(),
247269
preserve_insertion_order: false,
248270
max_concurrent_tasks: num_cpus::get(),
249-
max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB.
250271
optimize_type: OptimizeType::Compact,
251272
min_commit_interval: None,
252273
session: None,
@@ -296,16 +317,6 @@ impl<'a> OptimizeBuilder<'a> {
296317
self
297318
}
298319

299-
/// Max spill size
300-
#[deprecated(
301-
since = "0.29.0",
302-
note = "Pass in a `SessionState` configured with a `RuntimeEnv` and a `FairSpillPool`"
303-
)]
304-
pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self {
305-
self.max_spill_size = max_spill_size;
306-
self
307-
}
308-
309320
/// Min commit interval
310321
pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self {
311322
self.min_commit_interval = Some(min_commit_interval);
@@ -349,17 +360,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
349360
let session = this
350361
.session
351362
.and_then(|session| session.as_any().downcast_ref::<SessionState>().cloned())
352-
.unwrap_or_else(|| {
353-
let memory_pool = FairSpillPool::new(this.max_spill_size);
354-
let runtime = RuntimeEnvBuilder::new()
355-
.with_memory_pool(Arc::new(memory_pool))
356-
.build_arc()
357-
.unwrap();
358-
SessionStateBuilder::new()
359-
.with_default_features()
360-
.with_runtime_env(runtime)
361-
.build()
362-
});
363+
.unwrap_or_else(|| create_session_state_for_optimize(None, None));
363364
let plan = create_merge_plan(
364365
&this.log_store,
365366
this.optimize_type,

python/deltalake/_internal.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ class RawDeltaTable:
117117
partition_filters: PartitionFilterType | None,
118118
target_size: int | None,
119119
max_concurrent_tasks: int | None,
120+
max_spill_size: int | None,
121+
max_temp_directory_size: int | None,
120122
min_commit_interval: int | None,
121123
writer_properties: WriterProperties | None,
122124
commit_properties: CommitProperties | None,
@@ -129,6 +131,7 @@ class RawDeltaTable:
129131
target_size: int | None,
130132
max_concurrent_tasks: int | None,
131133
max_spill_size: int | None,
134+
max_temp_directory_size: int | None,
132135
min_commit_interval: int | None,
133136
writer_properties: WriterProperties | None,
134137
commit_properties: CommitProperties | None,

python/deltalake/table.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1961,6 +1961,8 @@ def compact(
19611961
partition_filters: FilterConjunctionType | None = None,
19621962
target_size: int | None = None,
19631963
max_concurrent_tasks: int | None = None,
1964+
max_spill_size: int | None = None,
1965+
max_temp_directory_size: int | None = None,
19641966
min_commit_interval: int | timedelta | None = None,
19651967
writer_properties: WriterProperties | None = None,
19661968
post_commithook_properties: PostCommitHookProperties | None = None,
@@ -1983,6 +1985,8 @@ def compact(
19831985
max_concurrent_tasks: the maximum number of concurrent tasks to use for
19841986
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
19851987
faster, but will also use more memory.
1988+
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. If not specified, uses DataFusion's default.
1989+
max_temp_directory_size: the maximum disk space for temporary spill files. If not specified, uses DataFusion's default.
19861990
min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
19871991
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
19881992
want a commit per partition.
@@ -2016,6 +2020,8 @@ def compact(
20162020
self.table._stringify_partition_values(partition_filters),
20172021
target_size,
20182022
max_concurrent_tasks,
2023+
max_spill_size,
2024+
max_temp_directory_size,
20192025
min_commit_interval,
20202026
writer_properties,
20212027
commit_properties,
@@ -2030,7 +2036,8 @@ def z_order(
20302036
partition_filters: FilterConjunctionType | None = None,
20312037
target_size: int | None = None,
20322038
max_concurrent_tasks: int | None = None,
2033-
max_spill_size: int = 20 * 1024 * 1024 * 1024,
2039+
max_spill_size: int | None = None,
2040+
max_temp_directory_size: int | None = None,
20342041
min_commit_interval: int | timedelta | None = None,
20352042
writer_properties: WriterProperties | None = None,
20362043
post_commithook_properties: PostCommitHookProperties | None = None,
@@ -2050,7 +2057,8 @@ def z_order(
20502057
max_concurrent_tasks: the maximum number of concurrent tasks to use for
20512058
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
20522059
faster, but will also use more memory.
2053-
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. Defaults to 20GB.
2060+
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. If not specified, uses DataFusion's default.
2061+
max_temp_directory_size: the maximum disk space for temporary spill files. If not specified, uses DataFusion's default.
20542062
min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
20552063
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
20562064
want a commit per partition.
@@ -2086,6 +2094,7 @@ def z_order(
20862094
target_size,
20872095
max_concurrent_tasks,
20882096
max_spill_size,
2097+
max_temp_directory_size,
20892098
min_commit_interval,
20902099
writer_properties,
20912100
commit_properties,

python/src/lib.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ use deltalake::operations::delete::DeleteBuilder;
4141
use deltalake::operations::drop_constraints::DropConstraintBuilder;
4242
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
4343
use deltalake::operations::load_cdf::CdfLoadBuilder;
44-
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
44+
use deltalake::operations::optimize::{
45+
create_session_state_for_optimize, OptimizeBuilder, OptimizeType,
46+
};
4547
use deltalake::operations::restore::RestoreBuilder;
4648
use deltalake::operations::set_tbl_properties::SetTablePropertiesBuilder;
4749
use deltalake::operations::update::UpdateBuilder;
@@ -601,6 +603,8 @@ impl RawDeltaTable {
601603
partition_filters = None,
602604
target_size = None,
603605
max_concurrent_tasks = None,
606+
max_spill_size = None,
607+
max_temp_directory_size = None,
604608
min_commit_interval = None,
605609
writer_properties=None,
606610
commit_properties=None,
@@ -613,6 +617,8 @@ impl RawDeltaTable {
613617
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
614618
target_size: Option<u64>,
615619
max_concurrent_tasks: Option<usize>,
620+
max_spill_size: Option<usize>,
621+
max_temp_directory_size: Option<u64>,
616622
min_commit_interval: Option<u64>,
617623
writer_properties: Option<PyWriterProperties>,
618624
commit_properties: Option<PyCommitProperties>,
@@ -621,6 +627,13 @@ impl RawDeltaTable {
621627
let (table, metrics) = py.allow_threads(|| {
622628
let mut cmd = OptimizeBuilder::new(self.log_store()?, self.cloned_state()?)
623629
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
630+
631+
if max_spill_size.is_some() || max_temp_directory_size.is_some() {
632+
let session =
633+
create_session_state_for_optimize(max_spill_size, max_temp_directory_size);
634+
cmd = cmd.with_session_state(Arc::new(session));
635+
}
636+
624637
if let Some(size) = target_size {
625638
cmd = cmd.with_target_size(size);
626639
}
@@ -663,7 +676,8 @@ impl RawDeltaTable {
663676
partition_filters = None,
664677
target_size = None,
665678
max_concurrent_tasks = None,
666-
max_spill_size = 20 * 1024 * 1024 * 1024,
679+
max_spill_size = None,
680+
max_temp_directory_size = None,
667681
min_commit_interval = None,
668682
writer_properties=None,
669683
commit_properties=None,
@@ -676,7 +690,8 @@ impl RawDeltaTable {
676690
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
677691
target_size: Option<u64>,
678692
max_concurrent_tasks: Option<usize>,
679-
max_spill_size: usize,
693+
max_spill_size: Option<usize>,
694+
max_temp_directory_size: Option<u64>,
680695
min_commit_interval: Option<u64>,
681696
writer_properties: Option<PyWriterProperties>,
682697
commit_properties: Option<PyCommitProperties>,
@@ -685,8 +700,14 @@ impl RawDeltaTable {
685700
let (table, metrics) = py.allow_threads(|| {
686701
let mut cmd = OptimizeBuilder::new(self.log_store()?, self.cloned_state()?)
687702
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
688-
.with_max_spill_size(max_spill_size)
689703
.with_type(OptimizeType::ZOrder(z_order_columns));
704+
705+
if max_spill_size.is_some() || max_temp_directory_size.is_some() {
706+
let session =
707+
create_session_state_for_optimize(max_spill_size, max_temp_directory_size);
708+
cmd = cmd.with_session_state(Arc::new(session));
709+
}
710+
690711
if let Some(size) = target_size {
691712
cmd = cmd.with_target_size(size);
692713
}

python/src/query.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::sync::Arc;
33
use deltalake::{
44
datafusion::prelude::SessionContext,
55
delta_datafusion::{
6-
DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider,
6+
DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionContext, DeltaTableProvider,
77
},
88
};
99
use pyo3::prelude::*;
@@ -25,8 +25,8 @@ pub(crate) struct PyQueryBuilder {
2525
impl PyQueryBuilder {
2626
#[new]
2727
pub fn new() -> Self {
28-
let config = DeltaSessionConfig::default().into();
29-
let ctx = SessionContext::new_with_config(config);
28+
let delta_ctx = DeltaSessionContext::new();
29+
let ctx = delta_ctx.into_inner();
3030

3131
PyQueryBuilder { ctx }
3232
}

0 commit comments

Comments
 (0)