Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 143 additions & 1 deletion datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use datafusion::logical_expr::{
ExprSchemable, LogicalPlan, Operator, Projection, SortExpr, Values,
};
use substrait::proto::expression::subquery::set_predicate::PredicateOp;
use substrait::proto::expression_reference::ExprType;
use url::Url;

use crate::extensions::Extensions;
Expand Down Expand Up @@ -96,7 +97,7 @@ use substrait::proto::{
sort_field::{SortDirection, SortKind::*},
AggregateFunction, Expression, NamedStruct, Plan, Rel, Type,
};
use substrait::proto::{FunctionArgument, SortField};
use substrait::proto::{ExtendedExpression, FunctionArgument, SortField};

// Substrait PrecisionTimestampTz indicates that the timestamp is relative to UTC, which
// is the same as the expectation for any non-empty timezone in DF, so any non-empty timezone
Expand Down Expand Up @@ -251,6 +252,147 @@ pub async fn from_substrait_plan(
}
}

/// An ExprContainer is a container for a collection of expressions with a common input schema
///
/// In addition, each expression is associated with a field, which defines the
/// expression's output. The data type and nullability of the field are calculated from the
/// expression and the input schema. However the names of the field (and its nested fields) are
/// derived from the Substrait message.
pub struct ExprContainer {
/// The input schema for the expressions
pub input_schema: DFSchemaRef,
/// The expressions
///
/// Each item contains an expression and the field that defines the expected nullability and name of the expr's output
pub exprs: Vec<(Expr, Field)>,
}

// Substrait fields (DataType, nullable) typically don't have names. In the few cases they do
// those names are provided as Vec<String>. This method attaches names to a (DataType, nullable)
// pair to create a Field
fn field_from_type_and_names<S: Into<String>, T: Iterator<Item = S>>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels similar to from_substrait_type, could we reuse that somehow?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or actually maybe make_renamed_schema is what you want?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I found the existing method in the producer but missed it in the consumer. I've updated the PR to use make_renamed_schema's helper method (which I've renamed as rename_field)

names: &mut T,
data_type: DataType,
nullable: bool,
self_name: Option<&'static str>, // E.g. "item" for list fields
// which don't have names in Substrait
) -> Result<Field> {
let self_name =
self_name
.map(|s| Ok(s.to_string()))
.unwrap_or_else(|| match names.next() {
Some(name) => Ok(name.into()),
None => {
plan_err!(
"Substrait message did not have enough field names for data type"
)
}
})?;
let data_type: Result<DataType> = match data_type {
DataType::Struct(fields) => {
let fields = fields
.iter()
.map(|f| {
field_from_type_and_names(
names,
f.data_type().clone(),
f.is_nullable(),
None,
)
})
.collect::<Result<_>>()?;
Ok(DataType::Struct(fields))
}
DataType::List(inner) => {
let inner = field_from_type_and_names(
names,
inner.data_type().clone(),
inner.is_nullable(),
Some("item"),
)?;
Ok(DataType::List(Arc::new(inner)))
}
DataType::LargeList(inner) => {
let inner = field_from_type_and_names(
names,
inner.data_type().clone(),
inner.is_nullable(),
Some("item"),
)?;
Ok(DataType::List(Arc::new(inner)))
}
DataType::Map(map_struct, sorted_keys) => {
let map_struct = field_from_type_and_names(
names,
map_struct.data_type().clone(),
map_struct.is_nullable(),
None,
)?;
Ok(DataType::Map(Arc::new(map_struct), sorted_keys))
}
_ => Ok(data_type),
};
Ok(Field::new(&self_name, data_type?, nullable))
}

/// Convert Substrait ExtendedExpression to ExprContainer
///
/// A Substrait ExtendedExpression message contains one or more expressions,
/// with names for the outputs, and an input schema. These pieces are all included
/// in the ExprContainer.
///
/// This is a top-level message and can be used to send expressions (not plans)
/// between systems. This is often useful for scenarios like pushdown where filter
/// expressions need to be sent to remote systems.
pub async fn from_substrait_extended_expr(
ctx: &SessionContext,
extended_expr: &ExtendedExpression,
) -> Result<ExprContainer> {
// Register function extension
let extensions = Extensions::try_from(&extended_expr.extensions)?;
if !extensions.type_variations.is_empty() {
return not_impl_err!("Type variation extensions are not supported");
}

let input_schema = DFSchemaRef::new(match &extended_expr.base_schema {
Some(base_schema) => from_substrait_named_struct(base_schema, &extensions),
None => {
plan_err!("required property `base_schema` missing from Substrait ExtendedExpression message")
}
}?);

// Parse expressions
let mut exprs = Vec::with_capacity(extended_expr.referred_expr.len());
for substrait_expr in &extended_expr.referred_expr {
let scalar_expr = match &substrait_expr.expr_type {
Some(ExprType::Expression(scalar_expr)) => Ok(scalar_expr),
Some(ExprType::Measure(_)) => {
not_impl_err!("Measure expressions are not yet supported")
}
None => {
plan_err!("required property `expr_type` missing from Substrait ExpressionReference message")
}
}?;
let expr =
from_substrait_rex(ctx, scalar_expr, &input_schema, &extensions).await?;
let (output_type, expected_nullability) =
expr.data_type_and_nullable(&input_schema)?;
let mut names_iter = substrait_expr.output_names.iter().map(|n| n.as_str());
let output_field = field_from_type_and_names(
&mut names_iter,
output_type,
expected_nullability,
None,
)?;
exprs.push((expr, output_field));
}

Ok(ExprContainer {
input_schema,
exprs,
})
}

/// parse projection
pub fn extract_projection(
t: LogicalPlan,
Expand Down
Loading