Skip to content

Commit e029662

Browse files
Merge branch 'main' into abhi/inconsistent-session-state
2 parents a31556a + 52e6fa2 commit e029662

File tree

27 files changed

+199
-167
lines changed

27 files changed

+199
-167
lines changed

crates/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ validator = { version = "0.19", features = ["derive"] }
9090
[dev-dependencies]
9191
criterion = "0.5"
9292
ctor = "0"
93-
datatest-stable = "0.2"
93+
datatest-stable = "0.3"
9494
deltalake-test = { path = "../test" }
9595
dotenvy = "0"
9696
fs_extra = "1.2.0"

crates/core/src/delta_datafusion/cdf/scan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pub struct DeltaCdfTableProvider {
2828
impl DeltaCdfTableProvider {
2929
/// Build a DeltaCDFTableProvider
3030
pub fn try_new(cdf_builder: CdfLoadBuilder) -> DeltaResult<Self> {
31-
let mut fields = cdf_builder.snapshot.input_schema()?.fields().to_vec();
31+
let mut fields = cdf_builder.snapshot.input_schema().fields().to_vec();
3232
for f in ADD_PARTITION_SCHEMA.clone() {
3333
fields.push(f.into());
3434
}

crates/core/src/delta_datafusion/expr.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,6 @@ mod test {
825825
.unwrap()
826826
.snapshot()
827827
.input_schema()
828-
.unwrap()
829828
.as_ref()
830829
.to_owned()
831830
.to_dfschema()

crates/core/src/delta_datafusion/mod.rs

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,10 @@ impl From<DataFusionError> for DeltaTableError {
117117
/// Convenience trait for calling common methods on snapshot hierarchies
118118
pub trait DataFusionMixins {
119119
/// The physical datafusion schema of a table
120-
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef>;
120+
fn read_schema(&self) -> ArrowSchemaRef;
121121

122122
/// Get the table schema as an [`ArrowSchemaRef`]
123-
fn input_schema(&self) -> DeltaResult<ArrowSchemaRef>;
123+
fn input_schema(&self) -> ArrowSchemaRef;
124124

125125
/// Parse an expression string into a datafusion [`Expr`]
126126
fn parse_predicate_expression(
@@ -131,49 +131,77 @@ pub trait DataFusionMixins {
131131
}
132132

133133
impl DataFusionMixins for Snapshot {
134-
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
135-
_arrow_schema(self.table_configuration(), true)
134+
fn read_schema(&self) -> ArrowSchemaRef {
135+
_arrow_schema(
136+
self.arrow_schema(),
137+
self.metadata().partition_columns(),
138+
true,
139+
)
136140
}
137141

138-
fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
139-
_arrow_schema(self.table_configuration(), false)
142+
fn input_schema(&self) -> ArrowSchemaRef {
143+
_arrow_schema(
144+
self.arrow_schema(),
145+
self.metadata().partition_columns(),
146+
false,
147+
)
140148
}
141149

142150
fn parse_predicate_expression(
143151
&self,
144152
expr: impl AsRef<str>,
145153
session: &impl Session,
146154
) -> DeltaResult<Expr> {
147-
let schema = DFSchema::try_from(self.arrow_schema()?.as_ref().to_owned())?;
155+
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
148156
parse_predicate_expression(&schema, expr, session)
149157
}
150158
}
151159

152160
impl DataFusionMixins for LogDataHandler<'_> {
153-
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
154-
_arrow_schema(self.table_configuration(), true)
161+
fn read_schema(&self) -> ArrowSchemaRef {
162+
_arrow_schema(
163+
Arc::new(
164+
self.table_configuration()
165+
.schema()
166+
.as_ref()
167+
.try_into_arrow()
168+
.unwrap(),
169+
),
170+
self.table_configuration().metadata().partition_columns(),
171+
true,
172+
)
155173
}
156174

157-
fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
158-
_arrow_schema(self.table_configuration(), false)
175+
fn input_schema(&self) -> ArrowSchemaRef {
176+
_arrow_schema(
177+
Arc::new(
178+
self.table_configuration()
179+
.schema()
180+
.as_ref()
181+
.try_into_arrow()
182+
.unwrap(),
183+
),
184+
self.table_configuration().metadata().partition_columns(),
185+
false,
186+
)
159187
}
160188

161189
fn parse_predicate_expression(
162190
&self,
163191
expr: impl AsRef<str>,
164192
session: &impl Session,
165193
) -> DeltaResult<Expr> {
166-
let schema = DFSchema::try_from(self.arrow_schema()?.as_ref().to_owned())?;
194+
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
167195
parse_predicate_expression(&schema, expr, session)
168196
}
169197
}
170198

171199
impl DataFusionMixins for EagerSnapshot {
172-
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
173-
self.snapshot().arrow_schema()
200+
fn read_schema(&self) -> ArrowSchemaRef {
201+
self.snapshot().read_schema()
174202
}
175203

176-
fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
204+
fn input_schema(&self) -> ArrowSchemaRef {
177205
self.snapshot().input_schema()
178206
}
179207

@@ -187,22 +215,20 @@ impl DataFusionMixins for EagerSnapshot {
187215
}
188216

189217
fn _arrow_schema(
190-
snapshot: &TableConfiguration,
218+
schema: SchemaRef,
219+
partition_columns: &[String],
191220
wrap_partitions: bool,
192-
) -> DeltaResult<ArrowSchemaRef> {
193-
let meta = snapshot.metadata();
194-
let schema = snapshot.schema();
195-
221+
) -> ArrowSchemaRef {
196222
let fields = schema
197223
.fields()
198-
.filter(|f| !meta.partition_columns().contains(&f.name().to_string()))
199-
.map(|f| f.try_into_arrow())
224+
.into_iter()
225+
.filter(|f| !partition_columns.contains(&f.name().to_string()))
226+
.cloned()
200227
.chain(
201228
// We need stable order between logical and physical schemas, but the order of
202229
// partitioning columns is not always the same in the json schema and the array
203-
meta.partition_columns().iter().map(|partition_col| {
204-
let f = schema.field(partition_col).unwrap();
205-
let field: Field = f.try_into_arrow()?;
230+
partition_columns.iter().map(|partition_col| {
231+
let field = schema.field_with_name(partition_col).unwrap();
206232
let corrected = if wrap_partitions {
207233
match field.data_type() {
208234
// Only dictionary-encode types that may be large
@@ -218,12 +244,11 @@ fn _arrow_schema(
218244
} else {
219245
field.data_type().clone()
220246
};
221-
Ok(field.with_data_type(corrected))
247+
Arc::new(field.clone().with_data_type(corrected))
222248
}),
223249
)
224-
.collect::<Result<Vec<Field>, _>>()?;
225-
226-
Ok(Arc::new(ArrowSchema::new(fields)))
250+
.collect::<Vec<_>>();
251+
Arc::new(ArrowSchema::new(fields))
227252
}
228253

229254
pub(crate) fn files_matching_predicate<'a>(
@@ -234,8 +259,8 @@ pub(crate) fn files_matching_predicate<'a>(
234259
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
235260
{
236261
let expr = SessionContext::new()
237-
.create_physical_expr(predicate, &log_data.arrow_schema()?.to_dfschema()?)?;
238-
let pruning_predicate = PruningPredicate::try_new(expr, log_data.arrow_schema()?)?;
262+
.create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
263+
let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
239264
let mask = pruning_predicate.prune(&log_data)?;
240265

241266
Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
@@ -294,7 +319,7 @@ pub(crate) fn df_logical_schema(
294319
) -> DeltaResult<SchemaRef> {
295320
let input_schema = match schema {
296321
Some(schema) => schema,
297-
None => snapshot.input_schema()?,
322+
None => snapshot.input_schema(),
298323
};
299324
let table_partition_cols = snapshot.metadata().partition_columns();
300325

crates/core/src/delta_datafusion/table_provider.rs

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -89,22 +89,14 @@ pub struct DeltaDataSink {
8989
/// transaction log access, snapshot state, and session configuration.
9090
impl DeltaDataSink {
9191
/// Create a new `DeltaDataSink`
92-
pub fn new(
93-
log_store: LogStoreRef,
94-
snapshot: EagerSnapshot,
95-
save_mode: SaveMode,
96-
) -> datafusion::common::Result<Self> {
97-
let schema = snapshot
98-
.arrow_schema()
99-
.map_err(|e| DataFusionError::External(Box::new(e)))?;
100-
101-
Ok(Self {
92+
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot, save_mode: SaveMode) -> Self {
93+
Self {
10294
log_store,
95+
schema: snapshot.read_schema(),
10396
snapshot,
10497
save_mode,
105-
schema,
10698
metrics: ExecutionPlanMetricsSet::new(),
107-
})
99+
}
108100
}
109101

110102
/// Create a streaming transformed version of the input that converts dictionary columns
@@ -156,20 +148,15 @@ impl DataSink for DeltaDataSink {
156148
data: SendableRecordBatchStream,
157149
_context: &Arc<TaskContext>,
158150
) -> datafusion::common::Result<u64> {
159-
let target_schema = self
160-
.snapshot
161-
.input_schema()
162-
.map_err(|e| DataFusionError::External(Box::new(e)))?;
151+
let target_schema = self.snapshot.input_schema();
163152

164153
let mut stream = self.create_converted_stream(data, target_schema.clone());
165154
let partition_columns = self.snapshot.metadata().partition_columns();
166155
let object_store = self.log_store.object_store(None);
167156
let total_rows_metric = MetricBuilder::new(&self.metrics).counter("total_rows", 0);
168157
let stats_config = WriterStatsConfig::new(DataSkippingNumIndexedCols::AllColumns, None);
169158
let config = WriterConfig::new(
170-
self.snapshot
171-
.arrow_schema()
172-
.map_err(|e| DataFusionError::External(Box::new(e)))?,
159+
self.snapshot.read_schema(),
173160
partition_columns.clone(),
174161
None,
175162
None,
@@ -322,7 +309,7 @@ impl DeltaScanConfigBuilder {
322309
/// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing
323310
pub fn build(&self, snapshot: &EagerSnapshot) -> DeltaResult<DeltaScanConfig> {
324311
let file_column_name = if self.include_file_column {
325-
let input_schema = snapshot.input_schema()?;
312+
let input_schema = snapshot.input_schema();
326313
let mut column_names: HashSet<&String> = HashSet::new();
327314
for field in input_schema.fields.iter() {
328315
column_names.insert(field.name());
@@ -439,9 +426,9 @@ impl<'a> DeltaScanBuilder<'a> {
439426
};
440427

441428
let schema = match config.schema.clone() {
442-
Some(value) => Ok(value),
443-
None => self.snapshot.arrow_schema(),
444-
}?;
429+
Some(value) => value,
430+
None => self.snapshot.read_schema(),
431+
};
445432

446433
let logical_schema = df_logical_schema(
447434
self.snapshot,
@@ -720,7 +707,7 @@ impl TableProvider for DeltaTable {
720707
}
721708

722709
fn schema(&self) -> Arc<Schema> {
723-
self.snapshot().unwrap().snapshot().arrow_schema().unwrap()
710+
self.snapshot().unwrap().snapshot().read_schema()
724711
}
725712

726713
fn table_type(&self) -> TableType {
@@ -881,7 +868,7 @@ impl TableProvider for DeltaTableProvider {
881868
};
882869

883870
let data_sink =
884-
DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode)?;
871+
DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode);
885872

886873
Ok(Arc::new(DataSinkExec::new(
887874
input,
@@ -1005,7 +992,7 @@ fn df_logical_schema(
1005992
) -> DeltaResult<SchemaRef> {
1006993
let input_schema = match schema {
1007994
Some(schema) => schema,
1008-
None => snapshot.input_schema()?,
995+
None => snapshot.input_schema(),
1009996
};
1010997
let table_partition_cols = snapshot.metadata().partition_columns();
1011998

crates/core/src/kernel/snapshot/iterators/scan_row.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ where
5151
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
5252
let this = self.project();
5353
match this.stream.poll_next(cx) {
54-
Poll::Ready(Some(Ok(batch))) => match parse_stats_column(&this.snapshot, &batch) {
54+
Poll::Ready(Some(Ok(batch))) => match parse_stats_column(this.snapshot, &batch) {
5555
Ok(batch) => Poll::Ready(Some(Ok(batch))),
5656
Err(err) => Poll::Ready(Some(Err(err))),
5757
},

0 commit comments

Comments
 (0)