Skip to content

Commit 921092e

Browse files
committed
feat: towards lazy loading table state
Signed-off-by: Robert Pack <[email protected]>
1 parent f7688c4 commit 921092e

File tree

23 files changed

+267
-229
lines changed

23 files changed

+267
-229
lines changed

crates/core/src/delta_datafusion/cdf/scan.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,15 @@ pub struct DeltaCdfTableProvider {
2828
impl DeltaCdfTableProvider {
2929
/// Build a DeltaCDFTableProvider
3030
pub fn try_new(cdf_builder: CdfLoadBuilder) -> DeltaResult<Self> {
31-
let mut fields = cdf_builder.snapshot.input_schema().fields().to_vec();
31+
let mut fields = cdf_builder
32+
.snapshot
33+
.as_ref()
34+
.ok_or(DeltaTableError::generic(
35+
"expected initialized snapshot for DeltaCdfTableProvider",
36+
))?
37+
.input_schema()
38+
.fields()
39+
.to_vec();
3240
for f in ADD_PARTITION_SCHEMA.clone() {
3341
fields.push(f.into());
3442
}

crates/core/src/delta_datafusion/table_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -626,7 +626,7 @@ impl<'a> DeltaScanBuilder<'a> {
626626
let mut pruned_batches = Vec::new();
627627
let mut mask_offset = 0;
628628

629-
for batch in &self.snapshot.files {
629+
for batch in self.snapshot.files()? {
630630
let batch_size = batch.num_rows();
631631
let batch_mask = &mask[mask_offset..mask_offset + batch_size];
632632
let batch_mask_array = BooleanArray::from(batch_mask.to_vec());

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,25 @@ impl Snapshot {
463463
pub struct EagerSnapshot {
464464
snapshot: Snapshot,
465465
// logical files in the snapshot
466-
pub(crate) files: Vec<RecordBatch>,
466+
files: Vec<RecordBatch>,
467+
}
468+
469+
pub(crate) async fn resolve_snapshot(
470+
log_store: &dyn LogStore,
471+
maybe_snapshot: Option<EagerSnapshot>,
472+
require_files: bool,
473+
) -> DeltaResult<EagerSnapshot> {
474+
if let Some(snapshot) = maybe_snapshot {
475+
if require_files {
476+
snapshot.with_files(log_store).await
477+
} else {
478+
Ok(snapshot)
479+
}
480+
} else {
481+
let mut config = DeltaTableConfig::default();
482+
config.require_files = require_files;
483+
EagerSnapshot::try_new(log_store, config, None).await
484+
}
467485
}
468486

469487
impl EagerSnapshot {
@@ -474,15 +492,36 @@ impl EagerSnapshot {
474492
version: Option<i64>,
475493
) -> DeltaResult<Self> {
476494
let snapshot = Snapshot::try_new(log_store, config.clone(), version).await?;
495+
Self::try_new_with_snapshot(log_store, snapshot).await
496+
}
477497

478-
let files = match config.require_files {
498+
pub(crate) async fn try_new_with_snapshot(
499+
log_store: &dyn LogStore,
500+
snapshot: Snapshot,
501+
) -> DeltaResult<Self> {
502+
let files = match snapshot.load_config().require_files {
479503
true => snapshot.files(log_store, None).try_collect().await?,
480504
false => vec![],
481505
};
482-
483506
Ok(Self { snapshot, files })
484507
}
485508

509+
pub(crate) async fn with_files(mut self, log_store: &dyn LogStore) -> DeltaResult<Self> {
510+
if self.snapshot.config.require_files {
511+
return Ok(self);
512+
}
513+
self.snapshot.config.require_files = true;
514+
Self::try_new_with_snapshot(log_store, self.snapshot).await
515+
}
516+
517+
pub(crate) fn files(&self) -> DeltaResult<&[RecordBatch]> {
518+
if self.snapshot.config.require_files {
519+
Ok(&self.files)
520+
} else {
521+
Err(DeltaTableError::NotInitializedWithFiles("files".into()))
522+
}
523+
}
524+
486525
/// Update the snapshot to the given version
487526
pub(crate) async fn update(
488527
&mut self,
@@ -588,6 +627,12 @@ impl EagerSnapshot {
588627
log_store: &dyn LogStore,
589628
predicate: Option<PredicateRef>,
590629
) -> BoxStream<'_, DeltaResult<LogicalFileView>> {
630+
if !self.snapshot.load_config().require_files {
631+
return Box::pin(once(ready(Err(DeltaTableError::NotInitializedWithFiles(
632+
"file_views".into(),
633+
)))));
634+
}
635+
591636
self.snapshot
592637
.files_from(
593638
log_store,

crates/core/src/operations/add_column.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,17 @@ use itertools::Itertools;
99
use super::{CustomExecuteHandler, Operation};
1010
use crate::kernel::schema::merge_delta_struct;
1111
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
12-
use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, StructField, StructTypeExt};
12+
use crate::kernel::{
13+
resolve_snapshot, EagerSnapshot, MetadataExt, ProtocolExt as _, StructField, StructTypeExt,
14+
};
1315
use crate::logstore::LogStoreRef;
1416
use crate::protocol::DeltaOperation;
1517
use crate::{DeltaResult, DeltaTable, DeltaTableError};
1618

1719
/// Add new columns and/or nested fields to a table
1820
pub struct AddColumnBuilder {
1921
/// A snapshot of the table's state
20-
snapshot: EagerSnapshot,
22+
snapshot: Option<EagerSnapshot>,
2123
/// Fields to add/merge into schema
2224
fields: Option<Vec<StructField>>,
2325
/// Delta object store for handling data files
@@ -38,7 +40,7 @@ impl Operation<()> for AddColumnBuilder {
3840

3941
impl AddColumnBuilder {
4042
/// Create a new builder
41-
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
43+
pub fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
4244
Self {
4345
snapshot,
4446
log_store,
@@ -75,7 +77,9 @@ impl std::future::IntoFuture for AddColumnBuilder {
7577
let this = self;
7678

7779
Box::pin(async move {
78-
let mut metadata = this.snapshot.metadata().clone();
80+
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?;
81+
82+
let mut metadata = snapshot.metadata().clone();
7983
let fields = match this.fields.clone() {
8084
Some(v) => v,
8185
None => return Err(DeltaTableError::Generic("No fields provided".to_string())),
@@ -95,10 +99,10 @@ impl std::future::IntoFuture for AddColumnBuilder {
9599
));
96100
}
97101

98-
let table_schema = this.snapshot.schema();
102+
let table_schema = snapshot.schema();
99103
let new_table_schema = merge_delta_struct(table_schema.as_ref(), fields_right)?;
100104

101-
let current_protocol = this.snapshot.protocol();
105+
let current_protocol = snapshot.protocol();
102106

103107
let new_protocol = current_protocol
104108
.clone()
@@ -121,7 +125,7 @@ impl std::future::IntoFuture for AddColumnBuilder {
121125
.with_actions(actions)
122126
.with_operation_id(operation_id)
123127
.with_post_commit_hook_handler(this.get_custom_execute_handler())
124-
.build(Some(&this.snapshot), this.log_store.clone(), operation)
128+
.build(Some(&snapshot), this.log_store.clone(), operation)
125129
.await?;
126130

127131
this.post_execute(operation_id).await?;

crates/core/src/operations/add_feature.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use itertools::Itertools;
88

99
use super::{CustomExecuteHandler, Operation};
1010
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
11-
use crate::kernel::{EagerSnapshot, ProtocolExt as _, TableFeatures};
11+
use crate::kernel::{resolve_snapshot, EagerSnapshot, ProtocolExt as _, TableFeatures};
1212
use crate::logstore::LogStoreRef;
1313
use crate::protocol::DeltaOperation;
1414
use crate::DeltaTable;
@@ -17,7 +17,7 @@ use crate::{DeltaResult, DeltaTableError};
1717
/// Enable table features for a table
1818
pub struct AddTableFeatureBuilder {
1919
/// A snapshot of the table's state
20-
snapshot: EagerSnapshot,
20+
snapshot: Option<EagerSnapshot>,
2121
/// Name of the feature
2222
name: Vec<TableFeatures>,
2323
/// Allow protocol versions to be increased by setting features
@@ -40,7 +40,7 @@ impl super::Operation<()> for AddTableFeatureBuilder {
4040

4141
impl AddTableFeatureBuilder {
4242
/// Create a new builder
43-
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
43+
pub fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
4444
Self {
4545
name: vec![],
4646
allow_protocol_versions_increase: false,
@@ -92,6 +92,8 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
9292
let this = self;
9393

9494
Box::pin(async move {
95+
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?;
96+
9597
let name = if this.name.is_empty() {
9698
return Err(DeltaTableError::Generic("No features provided".to_string()));
9799
} else {
@@ -107,7 +109,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
107109
let reader_features = reader_features.into_iter().flatten().collect_vec();
108110
let writer_features = writer_features.into_iter().flatten().collect_vec();
109111

110-
let mut protocol = this.snapshot.protocol().clone();
112+
let mut protocol = snapshot.protocol().clone();
111113

112114
if !this.allow_protocol_versions_increase {
113115
if !reader_features.is_empty()
@@ -135,7 +137,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder {
135137
.with_actions(actions)
136138
.with_operation_id(operation_id)
137139
.with_post_commit_hook_handler(this.get_custom_execute_handler())
138-
.build(Some(&this.snapshot), this.log_store.clone(), operation)
140+
.build(Some(&snapshot), this.log_store.clone(), operation)
139141
.await?;
140142

141143
this.post_execute(operation_id).await?;

crates/core/src/operations/constraints.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@ use super::{CustomExecuteHandler, Operation};
1515
use crate::delta_datafusion::expr::fmt_expr_to_sql;
1616
use crate::delta_datafusion::{create_session, register_store, DeltaDataChecker, DeltaScanBuilder};
1717
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
18-
use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner};
18+
use crate::kernel::{
19+
resolve_snapshot, EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner,
20+
};
1921
use crate::logstore::LogStoreRef;
2022
use crate::operations::datafusion_utils::Expression;
2123
use crate::protocol::DeltaOperation;
@@ -25,7 +27,7 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError};
2527
/// Build a constraint to add to a table
2628
pub struct ConstraintBuilder {
2729
/// A snapshot of the table's state
28-
snapshot: EagerSnapshot,
30+
snapshot: Option<EagerSnapshot>,
2931
/// Name of the constraint
3032
name: Option<String>,
3133
/// Constraint expression
@@ -50,7 +52,7 @@ impl super::Operation<()> for ConstraintBuilder {
5052

5153
impl ConstraintBuilder {
5254
/// Create a new builder
53-
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
55+
pub fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
5456
Self {
5557
name: None,
5658
expr: None,
@@ -101,11 +103,8 @@ impl std::future::IntoFuture for ConstraintBuilder {
101103
let this = self;
102104

103105
Box::pin(async move {
104-
if !this.snapshot.load_config().require_files {
105-
return Err(DeltaTableError::NotInitializedWithFiles(
106-
"ADD CONSTRAINTS".into(),
107-
));
108-
}
106+
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?;
107+
109108
let operation_id = this.get_operation_id();
110109
this.pre_execute(operation_id).await?;
111110

@@ -118,7 +117,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
118117
.expr
119118
.ok_or_else(|| DeltaTableError::Generic("No Expression provided".to_string()))?;
120119

121-
let mut metadata = this.snapshot.metadata().clone();
120+
let mut metadata = snapshot.metadata().clone();
122121
let configuration_key = format!("delta.constraints.{name}");
123122

124123
if metadata.configuration().contains_key(&configuration_key) {
@@ -132,10 +131,9 @@ impl std::future::IntoFuture for ConstraintBuilder {
132131
.unwrap_or_else(|| Arc::new(create_session().into_inner().state()));
133132
register_store(this.log_store.clone(), session.runtime_env().as_ref());
134133

135-
let scan =
136-
DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), session.as_ref())
137-
.build()
138-
.await?;
134+
let scan = DeltaScanBuilder::new(&snapshot, this.log_store.clone(), session.as_ref())
135+
.build()
136+
.await?;
139137

140138
let schema = scan.schema().to_dfschema()?;
141139
let expr = into_expr(expr, &schema, session.as_ref())?;
@@ -175,7 +173,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
175173
metadata =
176174
metadata.add_config_key(format!("delta.constraints.{name}"), expr_str.clone())?;
177175

178-
let old_protocol = this.snapshot.protocol();
176+
let old_protocol = snapshot.protocol();
179177
let protocol = ProtocolInner {
180178
min_reader_version: if old_protocol.min_reader_version() > 1 {
181179
old_protocol.min_reader_version()
@@ -213,7 +211,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
213211
.with_actions(actions)
214212
.with_operation_id(operation_id)
215213
.with_post_commit_hook_handler(this.custom_execute_handler.clone())
216-
.build(Some(&this.snapshot), this.log_store.clone(), operation)
214+
.build(Some(&snapshot), this.log_store.clone(), operation)
217215
.await?;
218216

219217
if let Some(handler) = this.custom_execute_handler {

crates/core/src/operations/delete.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::delta_datafusion::{
5252
};
5353
use crate::errors::DeltaResult;
5454
use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
55-
use crate::kernel::{Action, Add, EagerSnapshot, Remove};
55+
use crate::kernel::{resolve_snapshot, Action, Add, EagerSnapshot, Remove};
5656
use crate::logstore::LogStoreRef;
5757
use crate::operations::write::execution::{write_execution_plan, write_execution_plan_cdc};
5858
use crate::operations::write::WriterStatsConfig;
@@ -72,7 +72,7 @@ pub struct DeleteBuilder {
7272
/// Which records to delete
7373
predicate: Option<Expression>,
7474
/// A snapshot of the table's state
75-
snapshot: EagerSnapshot,
75+
snapshot: Option<EagerSnapshot>,
7676
/// Delta object store for handling data files
7777
log_store: LogStoreRef,
7878
/// Datafusion session state relevant for executing the input plan
@@ -125,7 +125,7 @@ impl super::Operation<()> for DeleteBuilder {
125125

126126
impl DeleteBuilder {
127127
/// Create a new [`DeleteBuilder`]
128-
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self {
128+
pub fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
129129
Self {
130130
predicate: None,
131131
snapshot,
@@ -176,8 +176,9 @@ impl std::future::IntoFuture for DeleteBuilder {
176176
let this = self;
177177

178178
Box::pin(async move {
179-
PROTOCOL.check_append_only(&this.snapshot)?;
180-
PROTOCOL.can_write_to(&this.snapshot)?;
179+
let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?;
180+
PROTOCOL.check_append_only(&snapshot)?;
181+
PROTOCOL.can_write_to(&snapshot)?;
181182

182183
let operation_id = this.get_operation_id();
183184
this.pre_execute(operation_id).await?;
@@ -191,18 +192,17 @@ impl std::future::IntoFuture for DeleteBuilder {
191192
let predicate = match this.predicate {
192193
Some(predicate) => match predicate {
193194
Expression::DataFusion(expr) => Some(expr),
194-
Expression::String(s) => Some(
195-
this.snapshot
196-
.parse_predicate_expression(s, session.as_ref())?,
197-
),
195+
Expression::String(s) => {
196+
Some(snapshot.parse_predicate_expression(s, session.as_ref())?)
197+
}
198198
},
199199
None => None,
200200
};
201201

202202
let (new_snapshot, metrics) = execute(
203203
predicate,
204204
this.log_store.clone(),
205-
this.snapshot,
205+
snapshot,
206206
session.as_ref(),
207207
this.writer_properties,
208208
this.commit_properties,

0 commit comments

Comments
 (0)