Skip to content

Commit 5cb6e4e

Browse files
committed
Try new structure
1 parent 9eb8457 commit 5cb6e4e

File tree

7 files changed

+52
-335
lines changed

7 files changed

+52
-335
lines changed

src/llm/plan.rs renamed to src/llm/exec.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::future::Future;
2121
use std::pin::Pin;
2222
use std::sync::{Arc, Mutex};
2323
use std::task::{Context, Poll};
24-
use crate::llm::expr::LLMExpr;
2524

2625
/// This structure evaluates a set of expressions on a record
2726
/// batch producing a new record batch
@@ -33,17 +32,16 @@ pub struct LLMExec {
3332
/// Cache holding plan properties like equivalences, output partitioning etc.
3433
cache: PlanProperties,
3534
metrics: ExecutionPlanMetricsSet,
36-
exprs: Vec<LLMExpr>,
35+
// which expressions to evaluate
3736
}
3837

3938
impl LLMExec {
40-
pub fn new(input: Arc<dyn ExecutionPlan>, metrics: ExecutionPlanMetricsSet, exprs: Vec<LLMExpr>) -> Self {
39+
pub fn new(input: Arc<dyn ExecutionPlan>, metrics: ExecutionPlanMetricsSet) -> Self {
4140
let cache = LLMExec::compute_properties(&input, input.schema().clone()).unwrap();
4241
Self {
4342
input,
4443
cache,
4544
metrics,
46-
exprs
4745
}
4846
}
4947

@@ -171,7 +169,6 @@ impl LLMStream {
171169
}
172170
}
173171

174-
175172
// TODO:
176173
// pretty batch and concat with promotion
177174
// as the input of the LLM function

src/llm/expr.rs

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/llm/logical.rs

Lines changed: 0 additions & 215 deletions
This file was deleted.

src/llm/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1+
pub mod exec;
12
pub mod functions;
2-
pub mod logical;
3-
pub mod physical_planner;
4-
pub mod plan;
5-
mod expr;
3+
pub mod physical_optimizer;

src/llm/physical_optimizer.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use datafusion::config::ConfigOptions;
2+
use datafusion::physical_optimizer::PhysicalOptimizerRule;
3+
use datafusion::physical_plan::projection::ProjectionExec;
4+
use datafusion::physical_plan::ExecutionPlan;
5+
use std::sync::Arc;
6+
7+
#[derive(Debug)]
8+
pub struct LLMRule {}
9+
10+
impl PhysicalOptimizerRule for LLMRule {
11+
fn optimize(
12+
&self,
13+
plan: Arc<dyn ExecutionPlan>,
14+
config: &ConfigOptions,
15+
) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
16+
/// Insert a LLMExec node in front of this projection if there are any llm functions in it
17+
///
18+
/// For example, if the projection is:
19+
/// ```text
20+
/// ProjectionExec(["A", "B", llm_func('foo', "C") + 1])
21+
/// ```
22+
/// Rewrite to
23+
/// ProjectionExec(["A", "B", "llm_func('foo', "C")" + 1]) <-- note here that the llm function is not evaluated and instead is a new column
24+
/// LLMExec(["A", "B", llm_func('foo', "C")])
25+
///
26+
/// This is similar to the rewrite done by the CommonSubexprEliminationRule
27+
28+
/// replace ProjectionExec with LLMExec if there are any llm functions
29+
let Some(proj_exec) = plan.as_any().downcast_ref::<ProjectionExec>() else {
30+
return Ok(plan);
31+
};
32+
33+
/// find llm functions in the expressions
34+
//proj_exec.expr()
35+
todo!()
36+
}
37+
38+
fn name(&self) -> &str {
39+
"llm"
40+
}
41+
42+
/// verify the schema has not changed
43+
fn schema_check(&self) -> bool {
44+
true
45+
}
46+
}

0 commit comments

Comments
 (0)