Skip to content

Conversation

@Jefffrey
Copy link
Contributor

@Jefffrey Jefffrey commented Oct 20, 2025

Which issue does this PR close?

Rationale for this change

Be able to serialize/deserialize logical plans containing scalar subqueries & outer column references in proto.

What changes are included in this PR?

  1. Added ScalarSubquery and OuterReferenceColumn variants to LogicalExprNode in datafusion.proto
  2. Changed parse_expr() to accept a &TaskContext instead of a &dyn FunctionRegistry, making accompanying plumbing changes
  3. Renamed Serializeable::from_bytes_with_registry trait method to Serializeable::from_bytes_with_ctx since the &dyn FunctionRegistry parameter was replaced with &TaskContext like above
  4. Implement the parsing to/from logical plan scalarsubquery + outer reference column for proto

Are these changes tested?

Added roundtrip test for a query with a scalar subquery and an outer reference column.

Are there any user-facing changes?

Yes, see points 2 & 3 in the above section

@github-actions github-actions bot added proto Related to proto crate ffi Changes to the ffi crate labels Oct 20, 2025
@Jefffrey Jefffrey added the api change Changes the API exposed to users of the crate label Oct 20, 2025
Comment on lines 258 to 262
pub fn parse_expr(
proto: &protobuf::LogicalExprNode,
registry: &dyn FunctionRegistry,
ctx: &TaskContext,
codec: &dyn LogicalExtensionCodec,
) -> Result<Expr, Error> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Expr::ScalarSubquery essentially contains a LogicalPlan, in order to parse this logical plan we need to use LogicalPlanNode::try_into_logical_plan which needs a TaskContext. So had to expand this API to required a TaskContext instead of simply dyn FunctionRegistry (which TaskContext implements). This propagates in many other places.

Comment on lines +638 to +657
ExprType::ScalarSubquery(scalar_subquery) => {
let subquery = scalar_subquery
.subquery
.as_ref()
.ok_or_else(|| Error::required("subquery"))?;
Ok(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(subquery.try_into_logical_plan(ctx, codec)?),
outer_ref_columns: parse_exprs(
&scalar_subquery.outer_ref_columns,
ctx,
codec,
)?,
spans: Spans::new(),
}))
}
ExprType::OuterReferenceColumn(OuterReferenceColumn { field, column }) => {
let column = column.to_owned().ok_or_else(|| Error::required("column"))?;
let field = field.as_ref().required("field")?;
Ok(Expr::OuterReferenceColumn(Arc::new(field), column.into()))
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual implementation for deserializing

Comment on lines +580 to +603
Expr::ScalarSubquery(Subquery {
subquery,
outer_ref_columns,
spans: _,
}) => protobuf::LogicalExprNode {
expr_type: Some(ExprType::ScalarSubquery(Box::new(
protobuf::ScalarSubquery {
subquery: Some(Box::new(
protobuf::LogicalPlanNode::try_from_logical_plan(subquery, codec)
.map_err(|e| Error::General(format!("Proto serialization error: Failed to serialize Scalar Subquery: {e}")))?,
)),
outer_ref_columns: serialize_exprs(outer_ref_columns, codec)?,
},
))),
},
Expr::OuterReferenceColumn(field, column) => {
protobuf::LogicalExprNode {
expr_type: Some(ExprType::OuterReferenceColumn(OuterReferenceColumn{
field: Some(field.as_ref().try_into()?), column: Some(column.into())
}))
}
}
Expr::InSubquery(_) | Expr::Exists { .. } => {
return Err(Error::NotImplemented("Proto serialization error: Expr::InSubquery(_) | Expr::Exists { .. } not supported".to_string()));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual implementation for serializing

protobuf::SelectionNode {
input: Some(Box::new(input)),
expr: Some(serialize_expr(
expr: Some(Box::new(serialize_expr(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My changes to the datafusion.proto seems to also affect some other nodes during the generation of the proto code, probably because of the enum becoming too big so it now boxes seemingly unrelated nodes.

bytes: &[u8],
registry: &dyn FunctionRegistry,
) -> Result<Self>;
fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking change to API here since we need a TaskContext because of that parse_expr change; I figured it'd make sense to also rename the method to be more accurate.

/// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
fn from_bytes(bytes: &[u8]) -> Result<Self> {
Self::from_bytes_with_registry(bytes, &registry::NoRegistry {})
Self::from_bytes_with_ctx(bytes, &TaskContext::default())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default TaskContext will have an empty function registry just like NoRegistry so this is equivalent; I also deleted NoRegistry entirely as it wasn't used anywhere else after it was removed here.

Comment on lines +173 to +188
// Copied from from_bytes_with_ctx below but with placeholder registry instead of
// default.
{
let bytes: &[u8] = &bytes;
let protobuf = protobuf::LogicalExprNode::decode(bytes).map_err(|e| {
plan_datafusion_err!("Error decoding expr as protobuf: {e}")
})?;

let extension_codec = PlaceholderLogicalExtensionCodec {};
logical_plan::from_proto::parse_expr(
&protobuf,
&TaskContext::default(),
&extension_codec,
)
.map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))?;
}
Copy link
Contributor Author

@Jefffrey Jefffrey Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit ugly, but it was because since we're now requiring TaskContext instead of dyn FunctionRegistry, there was no easy way within TaskContext to just pass through arbitrary UDFs/UDAFs/UDWFs like what PlaceHolderRegistry was meant to do. Instead, we achieve the same behaviour via the codec. So I'm copying the from_bytes_with_ctx code into here to customize the codec we pass in.

@Jefffrey Jefffrey marked this pull request as ready for review October 20, 2025 10:56
@adriangb adriangb self-assigned this Oct 25, 2025
@adriangb
Copy link
Contributor

I plan to review this, hopefully this weekend

@Jefffrey
Copy link
Contributor Author

I'll close this PR as I don't think it's important enough to warrant reviewer capacity.

  • I picked up this issue because I was reviewing the backlog and it seemed something we might be able to achieve now (compared to back then)

I think the main point highlighted by this PR is the inability of the current logical serde proto API to handle nested plans; otherwise the changes are rather elementary. Perhaps we can revisit this PR more easily after addressing #18477

@adriangb
Copy link
Contributor

adriangb commented Dec 8, 2025

Thanks for the exploration @Jefffrey and sorry I didn't give a good review. I agree with your conclusion that the takeaway here is that this seems like it should be easy but ends up being hard.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api change Changes the API exposed to users of the crate ffi Changes to the ffi crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add subquery support in datafusion-proto

2 participants