Skip to content

Commit 1216110

Browse files
committed
add values list expression
1 parent e6657f0 commit 1216110

14 files changed

Lines changed: 568 additions & 19 deletions

File tree

ballista/rust/core/proto/ballista.proto

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ message LogicalPlanNode {
255255
WindowNode window = 13;
256256
AnalyzeNode analyze = 14;
257257
CrossJoinNode cross_join = 15;
258+
ValuesNode values = 16;
258259
}
259260
}
260261

@@ -316,12 +317,12 @@ message SelectionNode {
316317
LogicalExprNode expr = 2;
317318
}
318319

319-
message SortNode{
320+
message SortNode {
320321
LogicalPlanNode input = 1;
321322
repeated LogicalExprNode expr = 2;
322323
}
323324

324-
message RepartitionNode{
325+
message RepartitionNode {
325326
LogicalPlanNode input = 1;
326327
oneof partition_method {
327328
uint64 round_robin = 2;
@@ -334,19 +335,26 @@ message HashRepartition {
334335
uint64 partition_count = 2;
335336
}
336337

337-
message EmptyRelationNode{
338+
message EmptyRelationNode {
338339
bool produce_one_row = 1;
339340
}
340341

341-
message CreateExternalTableNode{
342+
message CreateExternalTableNode {
342343
string name = 1;
343344
string location = 2;
344345
FileType file_type = 3;
345346
bool has_header = 4;
346347
DfSchema schema = 5;
347348
}
348349

349-
enum FileType{
350+
// a node containing data for defining values list. unlike in SQL where it's two dimensional, here
351+
// the list is flattened, and with the field n_cols it can be parsed and partitioned into rows
352+
message ValuesNode {
353+
uint64 n_cols = 1;
354+
repeated LogicalExprNode values_list = 2;
355+
}
356+
357+
enum FileType {
350358
NdJson = 0;
351359
Parquet = 1;
352360
CSV = 2;

ballista/rust/core/src/serde/logical_plan/from_proto.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,31 @@ impl TryInto<LogicalPlan> for &protobuf::LogicalPlanNode {
6060
))
6161
})?;
6262
match plan {
63+
LogicalPlanType::Values(values) => {
64+
let n_cols = values.n_cols as usize;
65+
let values: Vec<Vec<Expr>> = if values.values_list.is_empty() {
66+
Ok(Vec::new())
67+
} else if values.values_list.len() % n_cols != 0 {
68+
Err(BallistaError::General(format!(
69+
"Invalid values list length, expect {} to be divisible by {}",
70+
values.values_list.len(),
71+
n_cols
72+
)))
73+
} else {
74+
values
75+
.values_list
76+
.chunks_exact(n_cols)
77+
.map(|r| {
78+
r.into_iter()
79+
.map(|v| v.try_into())
80+
.collect::<Result<Vec<_>, _>>()
81+
})
82+
.collect::<Result<Vec<_>, _>>()
83+
}?;
84+
LogicalPlanBuilder::values(values)?
85+
.build()
86+
.map_err(|e| e.into())
87+
}
6388
LogicalPlanType::Projection(projection) => {
6489
let input: LogicalPlan = convert_box_required!(projection.input)?;
6590
let x: Vec<Expr> = projection

ballista/rust/core/src/serde/logical_plan/to_proto.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,26 @@ impl TryInto<protobuf::LogicalPlanNode> for &LogicalPlan {
675675
fn try_into(self) -> Result<protobuf::LogicalPlanNode, Self::Error> {
676676
use protobuf::logical_plan_node::LogicalPlanType;
677677
match self {
678+
LogicalPlan::Values { values, .. } => {
679+
let n_cols = if values.is_empty() {
680+
0
681+
} else {
682+
values[0].len()
683+
} as u64;
684+
let values_list = values
685+
.iter()
686+
.flatten()
687+
.map(|v| v.try_into())
688+
.collect::<Result<Vec<_>, _>>()?;
689+
Ok(protobuf::LogicalPlanNode {
690+
logical_plan_type: Some(LogicalPlanType::Values(
691+
protobuf::ValuesNode {
692+
n_cols,
693+
values_list,
694+
},
695+
)),
696+
})
697+
}
678698
LogicalPlan::TableScan {
679699
table_name,
680700
source,

datafusion/src/logical_plan/builder.rs

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,6 @@
1717

1818
//! This module provides a builder for creating LogicalPlans
1919
20-
use std::{
21-
collections::{HashMap, HashSet},
22-
sync::Arc,
23-
};
24-
25-
use arrow::{
26-
datatypes::{Schema, SchemaRef},
27-
record_batch::RecordBatch,
28-
};
29-
3020
use crate::datasource::{
3121
empty::EmptyTable,
3222
file_format::parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
@@ -37,6 +27,16 @@ use crate::datasource::{
3727
use crate::error::{DataFusionError, Result};
3828
use crate::logical_plan::plan::ToStringifiedPlan;
3929
use crate::prelude::*;
30+
use crate::scalar::ScalarValue;
31+
use arrow::{
32+
datatypes::{DataType, Schema, SchemaRef},
33+
record_batch::RecordBatch,
34+
};
35+
use std::convert::TryFrom;
36+
use std::{
37+
collections::{HashMap, HashSet},
38+
sync::Arc,
39+
};
4040

4141
use super::dfschema::ToDFSchema;
4242
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
@@ -111,6 +111,80 @@ impl LogicalPlanBuilder {
111111
})
112112
}
113113

114+
/// Create a values list based relation, and the schema is inferred from data. This will consume
115+
/// and mut the given value vec.
116+
///
117+
/// By default, it assigns the names column1, column2, etc. to the columns of a VALUES table.
118+
/// The column names are not specified by the SQL standard and different database systems do it differently,
119+
/// so it's usually better to override the default names with a table alias list.
120+
pub fn values(mut values: Vec<Vec<Expr>>) -> Result<Self> {
121+
if values.is_empty() {
122+
return Err(DataFusionError::Plan("Values list cannot be empty".into()));
123+
}
124+
let n_cols = values[0].len();
125+
if n_cols == 0 {
126+
return Err(DataFusionError::Plan(
127+
"Values list cannot be zero length".into(),
128+
));
129+
}
130+
let empty_schema = DFSchema::empty();
131+
let mut field_types: Vec<Option<DataType>> = Vec::with_capacity(n_cols);
132+
for _ in 0..n_cols {
133+
field_types.push(None);
134+
}
135+
// hold all the null holes so that we can correct their data types later
136+
let mut nulls: Vec<(usize, usize)> = Vec::new();
137+
for (i, row) in values.iter().enumerate() {
138+
if row.len() != n_cols {
139+
return Err(DataFusionError::Plan(format!(
140+
"Inconsistent data length across values list: got {} values in row {} but expected {}",
141+
row.len(),
142+
i,
143+
n_cols
144+
)));
145+
}
146+
field_types = row
147+
.iter()
148+
.enumerate()
149+
.map(|(j, expr)| {
150+
if let Expr::Literal(ScalarValue::Utf8(None)) = expr {
151+
nulls.push((i, j));
152+
Ok(field_types[j].clone())
153+
} else {
154+
let data_type = expr.get_type(&empty_schema)?;
155+
if let Some(prev_data_type) = &field_types[j] {
156+
if prev_data_type != &data_type {
157+
return Err(DataFusionError::Plan(format!(
158+
"Inconsistent data type across values list at column {}",
159+
i
160+
)));
161+
}
162+
}
163+
Ok(Some(data_type))
164+
}
165+
})
166+
.collect::<Result<Vec<Option<DataType>>>>()?;
167+
}
168+
let fields = field_types
169+
.iter()
170+
.enumerate()
171+
.map(|(j, data_type)| {
172+
DFField::new(
173+
None,
174+
// naming is following convention https://www.postgresql.org/docs/current/queries-values.html
175+
&format!("column{}", j + 1),
176+
data_type.clone().unwrap_or(DataType::Utf8),
177+
true,
178+
)
179+
})
180+
.collect::<Vec<_>>();
181+
for (i, j) in nulls {
182+
values[i][j] = Expr::Literal(ScalarValue::try_from(fields[j].data_type())?);
183+
}
184+
let schema = DFSchemaRef::new(DFSchema::new(fields)?);
185+
Ok(Self::from(LogicalPlan::Values { schema, values }))
186+
}
187+
114188
/// Scan a memory data source
115189
pub fn scan_memory(
116190
partitions: Vec<Vec<RecordBatch>>,

datafusion/src/logical_plan/plan.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,13 @@ pub enum LogicalPlan {
203203
/// Whether the CSV file contains a header
204204
has_header: bool,
205205
},
206+
/// Values expression
207+
Values {
208+
/// The table schema
209+
schema: DFSchemaRef,
210+
/// Values
211+
values: Vec<Vec<Expr>>,
212+
},
206213
/// Produces a relation with string representations of
207214
/// various parts of the plan
208215
Explain {
@@ -237,6 +244,7 @@ impl LogicalPlan {
237244
pub fn schema(&self) -> &DFSchemaRef {
238245
match self {
239246
LogicalPlan::EmptyRelation { schema, .. } => schema,
247+
LogicalPlan::Values { schema, .. } => schema,
240248
LogicalPlan::TableScan {
241249
projected_schema, ..
242250
} => projected_schema,
@@ -263,6 +271,7 @@ impl LogicalPlan {
263271
LogicalPlan::TableScan {
264272
projected_schema, ..
265273
} => vec![projected_schema],
274+
LogicalPlan::Values { schema, .. } => vec![schema],
266275
LogicalPlan::Window { input, schema, .. }
267276
| LogicalPlan::Aggregate { input, schema, .. }
268277
| LogicalPlan::Projection { input, schema, .. } => {
@@ -315,6 +324,9 @@ impl LogicalPlan {
315324
pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
316325
match self {
317326
LogicalPlan::Projection { expr, .. } => expr.clone(),
327+
LogicalPlan::Values { values, .. } => {
328+
values.iter().flatten().cloned().collect()
329+
}
318330
LogicalPlan::Filter { predicate, .. } => vec![predicate.clone()],
319331
LogicalPlan::Repartition {
320332
partitioning_scheme,
@@ -369,6 +381,7 @@ impl LogicalPlan {
369381
// plans without inputs
370382
LogicalPlan::TableScan { .. }
371383
| LogicalPlan::EmptyRelation { .. }
384+
| LogicalPlan::Values { .. }
372385
| LogicalPlan::CreateExternalTable { .. } => vec![],
373386
}
374387
}
@@ -515,6 +528,7 @@ impl LogicalPlan {
515528
// plans without inputs
516529
LogicalPlan::TableScan { .. }
517530
| LogicalPlan::EmptyRelation { .. }
531+
| LogicalPlan::Values { .. }
518532
| LogicalPlan::CreateExternalTable { .. } => true,
519533
};
520534
if !recurse {
@@ -702,6 +716,9 @@ impl LogicalPlan {
702716
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
703717
match &*self.0 {
704718
LogicalPlan::EmptyRelation { .. } => write!(f, "EmptyRelation"),
719+
LogicalPlan::Values { ref values, .. } => {
720+
write!(f, "Values: {} rows", values.len())
721+
}
705722
LogicalPlan::TableScan {
706723
ref table_name,
707724
ref projection,

datafusion/src/optimizer/common_subexpr_eliminate.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
199199
| LogicalPlan::Repartition { .. }
200200
| LogicalPlan::Union { .. }
201201
| LogicalPlan::TableScan { .. }
202+
| LogicalPlan::Values { .. }
202203
| LogicalPlan::EmptyRelation { .. }
203204
| LogicalPlan::Limit { .. }
204205
| LogicalPlan::CreateExternalTable { .. }

datafusion/src/optimizer/constant_folding.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ impl OptimizerRule for ConstantFolding {
7777
| LogicalPlan::Aggregate { .. }
7878
| LogicalPlan::Repartition { .. }
7979
| LogicalPlan::CreateExternalTable { .. }
80+
| LogicalPlan::Values { .. }
8081
| LogicalPlan::Extension { .. }
8182
| LogicalPlan::Sort { .. }
8283
| LogicalPlan::Explain { .. }

datafusion/src/optimizer/projection_push_down.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,7 @@ fn optimize_plan(
431431
| LogicalPlan::Filter { .. }
432432
| LogicalPlan::Repartition { .. }
433433
| LogicalPlan::EmptyRelation { .. }
434+
| LogicalPlan::Values { .. }
434435
| LogicalPlan::Sort { .. }
435436
| LogicalPlan::CreateExternalTable { .. }
436437
| LogicalPlan::CrossJoin { .. }

datafusion/src/optimizer/utils.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ pub fn from_plan(
124124
schema: schema.clone(),
125125
alias: alias.clone(),
126126
}),
127+
LogicalPlan::Values { schema, values } => Ok(LogicalPlan::Values {
128+
schema: schema.clone(),
129+
values: values.to_vec(),
130+
}),
127131
LogicalPlan::Filter { .. } => Ok(LogicalPlan::Filter {
128132
predicate: expr[0].clone(),
129133
input: Arc::new(inputs[0].clone()),

datafusion/src/physical_plan/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,5 +645,6 @@ pub mod udf;
645645
#[cfg(feature = "unicode_expressions")]
646646
pub mod unicode_expressions;
647647
pub mod union;
648+
pub mod values;
648649
pub mod window_functions;
649650
pub mod windows;

0 commit comments

Comments
 (0)