Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,8 @@ mod tests {
add_batch_simple, add_batch_with_partition_col, add_batch_with_remove,
run_with_validate_callback,
};
use crate::scan::{get_state_info, Scan};
use crate::scan::{Scan, StateInfo};
use crate::table_features::ColumnMappingMode;
use crate::Expression as Expr;
use crate::{
engine::sync::SyncEngine,
Expand Down Expand Up @@ -503,7 +504,8 @@ mod tests {
StructField::new("date", DataType::DATE, true),
]));
let partition_cols = ["date".to_string()];
let state_info = get_state_info(schema.as_ref(), &partition_cols).unwrap();
let state_info =
StateInfo::try_new(schema.as_ref(), &partition_cols, ColumnMappingMode::None).unwrap();
let static_transform = Some(Arc::new(Scan::get_static_transform(&state_info.all_fields)));
let batch = vec![add_batch_with_partition_col()];
let iter = scan_action_iter(
Expand Down
73 changes: 40 additions & 33 deletions kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,10 @@ impl ScanBuilder {
pub fn build(self) -> DeltaResult<Scan> {
// if no schema is provided, use snapshot's entire schema (e.g. SELECT *)
let logical_schema = self.schema.unwrap_or_else(|| self.snapshot.schema());
let state_info = get_state_info(
let state_info = StateInfo::try_new(
logical_schema.as_ref(),
&self.snapshot.metadata().partition_columns,
self.snapshot.table_configuration().column_mapping_mode(),
)?;

let physical_predicate = match self.predicate {
Expand Down Expand Up @@ -818,39 +819,45 @@ struct StateInfo {
have_partition_cols: bool,
}

/// Get the state needed to process a scan, see [`StateInfo`] for details.
fn get_state_info(logical_schema: &Schema, partition_columns: &[String]) -> DeltaResult<StateInfo> {
let mut have_partition_cols = false;
let mut read_fields = Vec::with_capacity(logical_schema.fields.len());
// Loop over all selected fields and note if they are columns that will be read from the
// parquet file ([`ColumnType::Selected`]) or if they are partition columns and will need to
// be filled in by evaluating an expression ([`ColumnType::Partition`])
let all_fields = logical_schema
.fields()
.enumerate()
.map(|(index, logical_field)| -> DeltaResult<_> {
if partition_columns.contains(logical_field.name()) {
// Store the index into the schema for this field. When we turn it into an
// expression in the inner loop, we will index into the schema and get the name and
// data type, which we need to properly materialize the column.
have_partition_cols = true;
Ok(ColumnType::Partition(index))
} else {
// Add to read schema, store field so we can build a `Column` expression later
// if needed (i.e. if we have partition columns)
let physical_field = logical_field.make_physical();
debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
let physical_name = physical_field.name.clone();
read_fields.push(physical_field);
Ok(ColumnType::Selected(physical_name))
}
impl StateInfo {
/// Get the state needed to process a scan.
fn try_new(
logical_schema: &Schema,
partition_columns: &[String],
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<Self> {
let mut have_partition_cols = false;
let mut read_fields = Vec::with_capacity(logical_schema.fields.len());
// Loop over all selected fields and note if they are columns that will be read from the
// parquet file ([`ColumnType::Selected`]) or if they are partition columns and will need to
// be filled in by evaluating an expression ([`ColumnType::Partition`])
let all_fields = logical_schema
.fields()
.enumerate()
.map(|(index, logical_field)| -> DeltaResult<_> {
if partition_columns.contains(logical_field.name()) {
// Store the index into the schema for this field. When we turn it into an
// expression in the inner loop, we will index into the schema and get the name and
// data type, which we need to properly materialize the column.
have_partition_cols = true;
Ok(ColumnType::Partition(index))
} else {
// Add to read schema, store field so we can build a `Column` expression later
// if needed (i.e. if we have partition columns)
let physical_field = logical_field.make_physical(column_mapping_mode);
debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n");
let physical_name = physical_field.name.clone();
read_fields.push(physical_field);
Ok(ColumnType::Selected(physical_name))
}
})
.try_collect()?;
Ok(StateInfo {
all_fields,
read_fields,
have_partition_cols,
})
.try_collect()?;
Ok(StateInfo {
all_fields,
read_fields,
have_partition_cols,
})
}
}

pub fn selection_vector(
Expand Down
Loading
Loading