Skip to content

Commit 9bd1b8e

Browse files
authored
refactors: move find_files into dedicated mod (#3815)
# Description Just moving some code around in the datafusion module. Specifically the `find_files` function and its dependencies are now in a dedicated module. There we still make the heaviest use of `Add` actions and contains significant complexity, so i wanted to isolate it for upcoming refactors. --------- Signed-off-by: Robert Pack <[email protected]>
1 parent 3939373 commit 9bd1b8e

File tree

7 files changed

+345
-346
lines changed

7 files changed

+345
-346
lines changed

crates/core/src/delta_datafusion/expr.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,7 @@ mod test {
816816
&table
817817
.snapshot()
818818
.unwrap()
819+
.snapshot()
819820
.input_schema()
820821
.unwrap()
821822
.as_ref()
@@ -926,6 +927,7 @@ mod test {
926927
let actual_expr = table
927928
.snapshot()
928929
.unwrap()
930+
.snapshot()
929931
.parse_predicate_expression(actual, &session.state())
930932
.unwrap();
931933

Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
use std::collections::HashMap;
2+
use std::fmt::Debug;
3+
use std::sync::Arc;
4+
5+
use arrow_array::{Array, RecordBatch, StringArray};
6+
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema};
7+
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
8+
use datafusion::datasource::MemTable;
9+
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
10+
use datafusion::logical_expr::{col, Expr, Volatility};
11+
use datafusion::physical_plan::filter::FilterExec;
12+
use datafusion::physical_plan::limit::LocalLimitExec;
13+
use datafusion::physical_plan::ExecutionPlan;
14+
use itertools::Itertools;
15+
16+
use crate::delta_datafusion::{
17+
df_logical_schema, get_path_column, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN,
18+
};
19+
use crate::errors::{DeltaResult, DeltaTableError};
20+
use crate::kernel::{Add, EagerSnapshot};
21+
use crate::logstore::LogStoreRef;
22+
23+
#[derive(Debug, Hash, Eq, PartialEq)]
24+
/// Representing the result of the [find_files] function.
25+
pub(crate) struct FindFiles {
26+
/// A list of `Add` objects that match the given predicate
27+
pub candidates: Vec<Add>,
28+
/// Was a physical read to the datastore required to determine the candidates
29+
pub partition_scan: bool,
30+
}
31+
32+
/// Finds files in a snapshot that match the provided predicate.
33+
pub(crate) async fn find_files(
34+
snapshot: &EagerSnapshot,
35+
log_store: LogStoreRef,
36+
state: &SessionState,
37+
predicate: Option<Expr>,
38+
) -> DeltaResult<FindFiles> {
39+
let current_metadata = snapshot.metadata();
40+
41+
match &predicate {
42+
Some(predicate) => {
43+
// Validate the Predicate and determine if it only contains partition columns
44+
let mut expr_properties = FindFilesExprProperties {
45+
partition_only: true,
46+
partition_columns: current_metadata.partition_columns().clone(),
47+
result: Ok(()),
48+
};
49+
50+
TreeNode::visit(predicate, &mut expr_properties)?;
51+
expr_properties.result?;
52+
53+
if expr_properties.partition_only {
54+
let candidates = scan_memory_table(snapshot, predicate).await?;
55+
Ok(FindFiles {
56+
candidates,
57+
partition_scan: true,
58+
})
59+
} else {
60+
let candidates =
61+
find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;
62+
63+
Ok(FindFiles {
64+
candidates,
65+
partition_scan: false,
66+
})
67+
}
68+
}
69+
None => Ok(FindFiles {
70+
candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(),
71+
partition_scan: true,
72+
}),
73+
}
74+
}
75+
76+
struct FindFilesExprProperties {
77+
pub partition_columns: Vec<String>,
78+
79+
pub partition_only: bool,
80+
pub result: DeltaResult<()>,
81+
}
82+
83+
/// Ensure only expressions that make sense are accepted, check for
84+
/// non-deterministic functions, and determine if the expression only contains
85+
/// partition columns
86+
impl TreeNodeVisitor<'_> for FindFilesExprProperties {
87+
type Node = Expr;
88+
89+
fn f_down(&mut self, expr: &Self::Node) -> datafusion::common::Result<TreeNodeRecursion> {
90+
// TODO: We can likely relax the volatility to STABLE. Would require further
91+
// research to confirm the same value is generated during the scan and
92+
// rewrite phases.
93+
94+
match expr {
95+
Expr::Column(c) => {
96+
if !self.partition_columns.contains(&c.name) {
97+
self.partition_only = false;
98+
}
99+
}
100+
Expr::ScalarVariable(_, _)
101+
| Expr::Literal(_, _)
102+
| Expr::Alias(_)
103+
| Expr::BinaryExpr(_)
104+
| Expr::Like(_)
105+
| Expr::SimilarTo(_)
106+
| Expr::Not(_)
107+
| Expr::IsNotNull(_)
108+
| Expr::IsNull(_)
109+
| Expr::IsTrue(_)
110+
| Expr::IsFalse(_)
111+
| Expr::IsUnknown(_)
112+
| Expr::IsNotTrue(_)
113+
| Expr::IsNotFalse(_)
114+
| Expr::IsNotUnknown(_)
115+
| Expr::Negative(_)
116+
| Expr::InList { .. }
117+
| Expr::Between(_)
118+
| Expr::Case(_)
119+
| Expr::Cast(_)
120+
| Expr::TryCast(_) => (),
121+
Expr::ScalarFunction(scalar_function) => {
122+
match scalar_function.func.signature().volatility {
123+
Volatility::Immutable => (),
124+
_ => {
125+
self.result = Err(DeltaTableError::Generic(format!(
126+
"Find files predicate contains nondeterministic function {}",
127+
scalar_function.func.name()
128+
)));
129+
return Ok(TreeNodeRecursion::Stop);
130+
}
131+
}
132+
}
133+
_ => {
134+
self.result = Err(DeltaTableError::Generic(format!(
135+
"Find files predicate contains unsupported expression {expr}"
136+
)));
137+
return Ok(TreeNodeRecursion::Stop);
138+
}
139+
}
140+
141+
Ok(TreeNodeRecursion::Continue)
142+
}
143+
}
144+
145+
fn join_batches_with_add_actions(
146+
batches: Vec<RecordBatch>,
147+
mut actions: HashMap<String, Add>,
148+
path_column: &str,
149+
dict_array: bool,
150+
) -> DeltaResult<Vec<Add>> {
151+
// Given RecordBatches that contains `__delta_rs_path` perform a hash join
152+
// with actions to obtain original add actions
153+
154+
let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum());
155+
for batch in batches {
156+
let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
157+
158+
let iter: Box<dyn Iterator<Item = Option<&str>>> = if dict_array {
159+
let array = get_path_column(&batch, path_column)?;
160+
Box::new(array.into_iter())
161+
} else {
162+
let array = batch
163+
.column_by_name(path_column)
164+
.ok_or_else(err)?
165+
.as_any()
166+
.downcast_ref::<StringArray>()
167+
.ok_or_else(err)?;
168+
Box::new(array.into_iter())
169+
};
170+
171+
for path in iter {
172+
let path = path.ok_or(DeltaTableError::Generic(format!(
173+
"{path_column} cannot be null"
174+
)))?;
175+
176+
match actions.remove(path) {
177+
Some(action) => files.push(action),
178+
None => {
179+
return Err(DeltaTableError::Generic(
180+
"Unable to map __delta_rs_path to action.".to_owned(),
181+
))
182+
}
183+
}
184+
}
185+
}
186+
Ok(files)
187+
}
188+
189+
/// Determine which files contain a record that satisfies the predicate
190+
async fn find_files_scan(
191+
snapshot: &EagerSnapshot,
192+
log_store: LogStoreRef,
193+
state: &SessionState,
194+
expression: Expr,
195+
) -> DeltaResult<Vec<Add>> {
196+
let candidate_map: HashMap<String, Add> = snapshot
197+
.log_data()
198+
.iter()
199+
.map(|f| f.add_action())
200+
.map(|add| (add.path.clone(), add.to_owned()))
201+
.collect();
202+
203+
let scan_config = DeltaScanConfigBuilder::default()
204+
.with_file_column(true)
205+
.build(snapshot)?;
206+
207+
let logical_schema = df_logical_schema(snapshot, &scan_config.file_column_name, None)?;
208+
209+
// Identify which columns we need to project
210+
let mut used_columns = expression
211+
.column_refs()
212+
.into_iter()
213+
.map(|column| logical_schema.index_of(&column.name))
214+
.collect::<Result<Vec<usize>, ArrowError>>()?;
215+
// Add path column
216+
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);
217+
218+
let scan = DeltaScanBuilder::new(snapshot, log_store, state)
219+
.with_filter(Some(expression.clone()))
220+
.with_projection(Some(&used_columns))
221+
.with_scan_config(scan_config)
222+
.build()
223+
.await?;
224+
let scan = Arc::new(scan);
225+
226+
let config = &scan.config;
227+
let input_schema = scan.logical_schema.as_ref().to_owned();
228+
let input_dfschema = input_schema.clone().try_into()?;
229+
230+
let predicate_expr =
231+
state.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?;
232+
233+
let filter: Arc<dyn ExecutionPlan> =
234+
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);
235+
let limit: Arc<dyn ExecutionPlan> = Arc::new(LocalLimitExec::new(filter, 1));
236+
237+
let task_ctx = Arc::new(TaskContext::from(state));
238+
let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;
239+
240+
join_batches_with_add_actions(
241+
path_batches,
242+
candidate_map,
243+
config.file_column_name.as_ref().unwrap(),
244+
true,
245+
)
246+
}
247+
248+
async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaResult<Vec<Add>> {
249+
let actions = snapshot
250+
.log_data()
251+
.iter()
252+
.map(|f| f.add_action())
253+
.collect_vec();
254+
255+
let batch = snapshot.add_actions_table(true)?;
256+
let mut arrays = Vec::new();
257+
let mut fields = Vec::new();
258+
259+
let schema = batch.schema();
260+
261+
arrays.push(
262+
batch
263+
.column_by_name("path")
264+
.ok_or(DeltaTableError::Generic(
265+
"Column with name `path` does not exist".to_owned(),
266+
))?
267+
.to_owned(),
268+
);
269+
fields.push(Field::new(PATH_COLUMN, ArrowDataType::Utf8, false));
270+
271+
for field in schema.fields() {
272+
if field.name().starts_with("partition.") {
273+
let name = field.name().strip_prefix("partition.").unwrap();
274+
275+
arrays.push(batch.column_by_name(field.name()).unwrap().to_owned());
276+
fields.push(Field::new(
277+
name,
278+
field.data_type().to_owned(),
279+
field.is_nullable(),
280+
));
281+
}
282+
}
283+
284+
let schema = Arc::new(ArrowSchema::new(fields));
285+
let batch = RecordBatch::try_new(schema, arrays)?;
286+
let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
287+
288+
let ctx = SessionContext::new();
289+
let mut df = ctx.read_table(Arc::new(mem_table))?;
290+
df = df
291+
.filter(predicate.to_owned())?
292+
.select(vec![col(PATH_COLUMN)])?;
293+
let batches = df.collect().await?;
294+
295+
let map = actions
296+
.into_iter()
297+
.map(|action| (action.path.clone(), action))
298+
.collect::<HashMap<String, Add>>();
299+
300+
join_batches_with_add_actions(batches, map, PATH_COLUMN, false)
301+
}

0 commit comments

Comments
 (0)