Skip to content

Commit a31556a

Browse files
Merge branch 'main' into abhi/inconsistent-session-state
2 parents 841646f + 4301422 commit a31556a

File tree

5 files changed

+270
-507
lines changed

5 files changed

+270
-507
lines changed

crates/core/src/kernel/arrow/engine_ext.rs

Lines changed: 3 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,14 @@ use delta_kernel::arrow::record_batch::RecordBatch;
1212
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
1313
use delta_kernel::engine::arrow_data::ArrowEngineData;
1414
use delta_kernel::expressions::{ColumnName, Scalar, StructData};
15-
use delta_kernel::scan::{Scan, ScanMetadata};
15+
use delta_kernel::scan::ScanMetadata;
1616
use delta_kernel::schema::{
1717
ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField,
1818
StructType,
1919
};
2020
use delta_kernel::snapshot::Snapshot;
2121
use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
22-
use delta_kernel::{
23-
DeltaResult, Engine, EngineData, ExpressionEvaluator, ExpressionRef, PredicateRef, Version,
24-
};
25-
use itertools::Itertools;
22+
use delta_kernel::{DeltaResult, ExpressionEvaluator, ExpressionRef};
2623

2724
use crate::errors::{DeltaResult as DeltaResultLocal, DeltaTableError};
2825
use crate::kernel::SCAN_ROW_ARROW_SCHEMA;
@@ -47,57 +44,6 @@ pub(crate) struct ScanMetadataArrow {
4744
pub scan_file_transforms: Vec<Option<ExpressionRef>>,
4845
}
4946

50-
/// Internal extension trait to streamline working with Kernel scan objects.
51-
///
52-
/// THe trait mainly handles conversion between arrow `RecordBatch` and `ArrowEngineData`.
53-
/// The exposed methods are arrow-variants of methods already exposed on the kernel scan.
54-
pub(crate) trait ScanExt {
55-
/// Get the metadata for a table scan.
56-
///
57-
/// This method handles translation between `EngineData` and `RecordBatch`
58-
/// and will already apply any selection vectors to the data.
59-
/// See [`Scan::scan_metadata`] for details.
60-
fn scan_metadata_arrow(
61-
&self,
62-
engine: &dyn Engine,
63-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;
64-
65-
fn scan_metadata_from_arrow(
66-
&self,
67-
engine: &dyn Engine,
68-
existing_version: Version,
69-
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
70-
existing_predicate: Option<PredicateRef>,
71-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>>;
72-
}
73-
74-
impl ScanExt for Scan {
75-
fn scan_metadata_arrow(
76-
&self,
77-
engine: &dyn Engine,
78-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
79-
Ok(self
80-
.scan_metadata(engine)?
81-
.map_ok(kernel_to_arrow)
82-
.flatten())
83-
}
84-
85-
fn scan_metadata_from_arrow(
86-
&self,
87-
engine: &dyn Engine,
88-
existing_version: Version,
89-
existing_data: Box<dyn Iterator<Item = RecordBatch>>,
90-
existing_predicate: Option<PredicateRef>,
91-
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanMetadataArrow>>> {
92-
let engine_iter =
93-
existing_data.map(|batch| Box::new(ArrowEngineData::new(batch)) as Box<dyn EngineData>);
94-
Ok(self
95-
.scan_metadata_from(engine, existing_version, engine_iter, existing_predicate)?
96-
.map_ok(kernel_to_arrow)
97-
.flatten())
98-
}
99-
}
100-
10147
/// Internal extension traits to the Kernel Snapshot.
10248
///
10349
/// These traits provide additional convenience functionality for working with Kernel snapshots.
@@ -439,7 +385,7 @@ fn is_skipping_eligeble_datatype(data_type: &PrimitiveType) -> bool {
439385
)
440386
}
441387

442-
fn kernel_to_arrow(metadata: ScanMetadata) -> DeltaResult<ScanMetadataArrow> {
388+
pub(crate) fn kernel_to_arrow(metadata: ScanMetadata) -> DeltaResult<ScanMetadataArrow> {
443389
let scan_file_transforms = metadata
444390
.scan_file_transforms
445391
.into_iter()

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 75 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ use object_store::ObjectStore;
3939
use tokio::task::spawn_blocking;
4040

4141
use super::{Action, CommitInfo, Metadata, Protocol};
42-
use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, ScanExt};
43-
#[cfg(test)]
44-
use crate::kernel::transaction::CommitData;
42+
use crate::kernel::arrow::engine_ext::{kernel_to_arrow, ExpressionEvaluatorExt};
43+
use crate::kernel::snapshot::scan::ScanBuilder;
4544
use crate::kernel::{StructType, ARROW_HANDLER};
4645
use crate::logstore::{LogStore, LogStoreExt};
4746
use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter};
@@ -52,6 +51,7 @@ pub use stream::*;
5251

5352
mod iterators;
5453
mod log_data;
54+
mod scan;
5555
mod serde;
5656
mod stream;
5757

@@ -117,41 +117,12 @@ impl Snapshot {
117117
})
118118
}
119119

120-
#[cfg(test)]
121-
pub async fn new_test<'a>(
122-
commits: impl IntoIterator<Item = &'a CommitData>,
123-
) -> DeltaResult<(Self, Arc<dyn LogStore>)> {
124-
use crate::logstore::{commit_uri_from_version, default_logstore};
125-
use object_store::memory::InMemory;
126-
let store = Arc::new(InMemory::new());
127-
128-
for (idx, commit) in commits.into_iter().enumerate() {
129-
let uri = commit_uri_from_version(idx as i64);
130-
let data = commit.get_bytes()?;
131-
store.put(&uri, data.into()).await?;
132-
}
133-
134-
let table_url = url::Url::parse("memory:///").unwrap();
135-
136-
let log_store = default_logstore(
137-
store.clone(),
138-
store.clone(),
139-
&table_url,
140-
&Default::default(),
141-
);
142-
143-
let engine = log_store.engine(None);
144-
let snapshot = KernelSnapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
145-
let schema = snapshot.table_configuration().schema();
120+
pub fn scan_builder(&self) -> ScanBuilder {
121+
ScanBuilder::new(self.inner.clone())
122+
}
146123

147-
Ok((
148-
Self {
149-
inner: snapshot,
150-
config: Default::default(),
151-
schema,
152-
},
153-
log_store,
154-
))
124+
pub fn into_scan_builder(self) -> ScanBuilder {
125+
ScanBuilder::new(self.inner)
155126
}
156127

157128
/// Update the snapshot to the given version
@@ -249,34 +220,18 @@ impl Snapshot {
249220
log_store: &dyn LogStore,
250221
predicate: Option<PredicateRef>,
251222
) -> SendableRBStream {
252-
let scan = match self
253-
.inner
254-
.clone()
255-
.scan_builder()
256-
.with_predicate(predicate)
257-
.build()
258-
{
223+
let scan = match self.scan_builder().with_predicate(predicate).build() {
259224
Ok(scan) => scan,
260-
Err(err) => return Box::pin(once(ready(Err(DeltaTableError::KernelError(err))))),
225+
Err(err) => return Box::pin(once(ready(Err(err)))),
261226
};
262227

263-
// TODO: which capacity to choose?
264-
let mut builder = RecordBatchReceiverStreamBuilder::new(100);
265-
let snapshot = scan.snapshot().clone();
266-
267228
// TODO: bundle operation id with log store ...
268229
let engine = log_store.engine(None);
269-
let tx = builder.tx();
270-
builder.spawn_blocking(move || {
271-
for res in scan.scan_metadata_arrow(engine.as_ref())? {
272-
if tx.blocking_send(Ok(res?.scan_files)).is_err() {
273-
break;
274-
}
275-
}
276-
Ok(())
277-
});
230+
let stream = scan
231+
.scan_metadata(engine)
232+
.map(|d| Ok(kernel_to_arrow(d?)?.scan_files));
278233

279-
ScanRowOutStream::new(snapshot, builder.build()).boxed()
234+
ScanRowOutStream::new(self.inner.clone(), stream).boxed()
280235
}
281236

282237
pub(crate) fn files_from<T: Iterator<Item = RecordBatch> + Send + 'static>(
@@ -287,50 +242,17 @@ impl Snapshot {
287242
existing_data: Box<T>,
288243
existing_predicate: Option<PredicateRef>,
289244
) -> SendableRBStream {
290-
let scan = match self
291-
.inner
292-
.clone()
293-
.scan_builder()
294-
.with_predicate(predicate)
295-
.build()
296-
{
245+
let scan = match self.scan_builder().with_predicate(predicate).build() {
297246
Ok(scan) => scan,
298-
Err(err) => return Box::pin(once(ready(Err(DeltaTableError::KernelError(err))))),
299-
};
300-
301-
let snapshot = scan.snapshot().clone();
302-
303-
// process our stored / caed data to conform to the expected input for log replay
304-
let evaluator = match scan_row_in_eval(&snapshot) {
305-
Ok(scan_row_in_eval) => scan_row_in_eval,
306247
Err(err) => return Box::pin(once(ready(Err(err)))),
307248
};
308-
let scan_row_iter = existing_data.map(move |b| {
309-
evaluator
310-
.evaluate_arrow(b)
311-
.expect("Illegal stored log data")
312-
});
313249

314-
// TODO: which capacity to choose?
315-
let mut builder = RecordBatchReceiverStreamBuilder::new(100);
316-
// TODO: bundle operation id with log store ...
317250
let engine = log_store.engine(None);
318-
let tx = builder.tx();
319-
builder.spawn_blocking(move || {
320-
for res in scan.scan_metadata_from_arrow(
321-
engine.as_ref(),
322-
existing_version,
323-
Box::new(scan_row_iter),
324-
existing_predicate,
325-
)? {
326-
if tx.blocking_send(Ok(res?.scan_files)).is_err() {
327-
break;
328-
}
329-
}
330-
Ok(())
331-
});
251+
let stream = scan
252+
.scan_metadata_from(engine, existing_version, existing_data, existing_predicate)
253+
.map(|d| Ok(kernel_to_arrow(d?)?.scan_files));
332254

333-
ScanRowOutStream::new(snapshot, builder.build()).boxed()
255+
ScanRowOutStream::new(self.inner.clone(), stream).boxed()
334256
}
335257

336258
/// Get the commit infos in the snapshot
@@ -446,8 +368,7 @@ impl Snapshot {
446368

447369
builder.spawn_blocking(move || {
448370
for res in remove_data {
449-
let batch: RecordBatch =
450-
ArrowEngineData::try_from_engine_data(res?.actions)?.into();
371+
let batch = ArrowEngineData::try_from_engine_data(res?.actions)?.into();
451372
if tx.blocking_send(Ok(batch)).is_err() {
452373
break;
453374
}
@@ -532,18 +453,6 @@ impl EagerSnapshot {
532453
Ok(Self { snapshot, files })
533454
}
534455

535-
#[cfg(test)]
536-
pub async fn new_test<'a>(
537-
commits: impl IntoIterator<Item = &'a CommitData>,
538-
) -> DeltaResult<Self> {
539-
let (snapshot, log_store) = Snapshot::new_test(commits).await?;
540-
let files: Vec<_> = snapshot
541-
.files(log_store.as_ref(), None)
542-
.try_collect()
543-
.await?;
544-
Ok(Self { snapshot, files })
545-
}
546-
547456
/// Update the snapshot to the given version
548457
pub(crate) async fn update(
549458
&mut self,
@@ -726,7 +635,61 @@ mod tests {
726635
// use super::log_segment::tests::{concurrent_checkpoint};
727636
// use super::replay::tests::test_log_replay;
728637
use super::*;
729-
use crate::test_utils::{assert_batches_sorted_eq, TestResult, TestTables};
638+
use crate::{
639+
kernel::transaction::CommitData,
640+
test_utils::{assert_batches_sorted_eq, TestResult, TestTables},
641+
};
642+
643+
impl Snapshot {
644+
pub async fn new_test<'a>(
645+
commits: impl IntoIterator<Item = &'a CommitData>,
646+
) -> DeltaResult<(Self, Arc<dyn LogStore>)> {
647+
use crate::logstore::{commit_uri_from_version, default_logstore};
648+
use object_store::memory::InMemory;
649+
let store = Arc::new(InMemory::new());
650+
651+
for (idx, commit) in commits.into_iter().enumerate() {
652+
let uri = commit_uri_from_version(idx as i64);
653+
let data = commit.get_bytes()?;
654+
store.put(&uri, data.into()).await?;
655+
}
656+
657+
let table_url = url::Url::parse("memory:///").unwrap();
658+
659+
let log_store = default_logstore(
660+
store.clone(),
661+
store.clone(),
662+
&table_url,
663+
&Default::default(),
664+
);
665+
666+
let engine = log_store.engine(None);
667+
let snapshot = KernelSnapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
668+
let schema = snapshot.table_configuration().schema();
669+
670+
Ok((
671+
Self {
672+
inner: snapshot,
673+
config: Default::default(),
674+
schema,
675+
},
676+
log_store,
677+
))
678+
}
679+
}
680+
681+
impl EagerSnapshot {
682+
pub async fn new_test<'a>(
683+
commits: impl IntoIterator<Item = &'a CommitData>,
684+
) -> DeltaResult<Self> {
685+
let (snapshot, log_store) = Snapshot::new_test(commits).await?;
686+
let files: Vec<_> = snapshot
687+
.files(log_store.as_ref(), None)
688+
.try_collect()
689+
.await?;
690+
Ok(Self { snapshot, files })
691+
}
692+
}
730693

731694
#[tokio::test]
732695
async fn test_snapshots() -> TestResult {

0 commit comments

Comments
 (0)