Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/ffi/src/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ fn supports_filters_pushdown_internal(
let proto_filters = LogicalExprList::decode(filters_serialized)
.map_err(|e| DataFusionError::Plan(e.to_string()))?;

parse_exprs(proto_filters.expr.iter(), &default_ctx, &codec)?
parse_exprs(proto_filters.expr.iter(), &default_ctx.task_ctx(), &codec)?
}
};
let filters_borrowed: Vec<&Expr> = filters.iter().collect();
Expand Down Expand Up @@ -252,7 +252,7 @@ unsafe extern "C" fn scan_fn_wrapper(

rresult_return!(parse_exprs(
proto_filters.expr.iter(),
&default_ctx,
&default_ctx.task_ctx(),
&codec
))
}
Expand Down
7 changes: 5 additions & 2 deletions datafusion/ffi/src/udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ unsafe extern "C" fn call_fn_wrapper(

let proto_filters = rresult_return!(LogicalExprList::decode(args.as_ref()));

let args =
rresult_return!(parse_exprs(proto_filters.expr.iter(), &default_ctx, &codec));
let args = rresult_return!(parse_exprs(
proto_filters.expr.iter(),
&default_ctx.task_ctx(),
&codec
));

let table_provider = rresult_return!(udtf.call(&args));
RResult::ROk(FFI_TableProvider::new(table_provider, false, runtime))
Expand Down
12 changes: 12 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,21 @@ message LogicalExprNode {

Unnest unnest = 35;

ScalarSubquery scalar_subquery = 36;
OuterReferenceColumn outer_reference_column = 37;
}
}

message ScalarSubquery {
LogicalPlanNode subquery = 1;
repeated LogicalExprNode outer_ref_columns = 2;
}

message OuterReferenceColumn {
datafusion_common.Field field = 1;
datafusion_common.Column column = 2;
}

message Wildcard {
TableReference qualifier = 1;
}
Expand Down
147 changes: 75 additions & 72 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,22 @@ use crate::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
use crate::protobuf;
use datafusion_common::{plan_datafusion_err, Result};
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_catalog::TableProvider;
use datafusion_common::{plan_datafusion_err, Result, TableReference};
use datafusion_execution::TaskContext;
use datafusion_expr::{
create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
WindowUDF,
create_udaf, create_udf, create_udwf, AggregateUDF, Expr, Extension, LogicalPlan,
ScalarUDF, Volatility, WindowUDF,
};
use datafusion_physical_plan::ExecutionPlan;
// Reexport Bytes which appears in the API
use prost::{
bytes::{Bytes, BytesMut},
Message,
};
use std::sync::Arc;

// Reexport Bytes which appears in the API
use datafusion_execution::registry::FunctionRegistry;
use datafusion_expr::planner::ExprPlanner;
use datafusion_physical_plan::ExecutionPlan;

mod registry;

/// Encodes something (such as [`Expr`]) to/from a stream of
/// bytes.
///
Expand All @@ -66,24 +63,19 @@ pub trait Serializeable: Sized {

/// Convert `bytes` (the output of [`to_bytes`]) back into an
/// object. This will error if the serialized bytes contain any
/// user defined functions, in which case use
/// [`from_bytes_with_registry`]
/// user defined functions, in which case use [`from_bytes_with_ctx`].
///
/// [`to_bytes`]: Self::to_bytes
/// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
/// [`from_bytes_with_ctx`]: Self::from_bytes_with_ctx
fn from_bytes(bytes: &[u8]) -> Result<Self> {
Self::from_bytes_with_registry(bytes, &registry::NoRegistry {})
Self::from_bytes_with_ctx(bytes, &TaskContext::default())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default TaskContext will have an empty function registry just like NoRegistry so this is equivalent; I also deleted NoRegistry entirely as it wasn't used anywhere else after it was removed here.

}

/// Convert `bytes` (the output of [`to_bytes`]) back into an
/// object resolving user defined functions with the specified
/// `registry`
/// Convert `bytes` (the output of [`to_bytes`]) back into an object
/// resolving user defined functions with the specified `ctx`.
///
/// [`to_bytes`]: Self::to_bytes
fn from_bytes_with_registry(
bytes: &[u8],
registry: &dyn FunctionRegistry,
) -> Result<Self>;
fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking change to API here since we need a TaskContext because of that parse_expr change; I figured it'd make sense to also rename the method to be more accurate.

}

impl Serializeable for Expr {
Expand All @@ -104,95 +96,106 @@ impl Serializeable for Expr {
// Until the underlying prost issue ( https://github.com/tokio-rs/prost/issues/736 ) is fixed, we try to
// deserialize the data here and check for errors.
//
// Need to provide some placeholder registry because the stream may contain UDFs
struct PlaceHolderRegistry;

impl FunctionRegistry for PlaceHolderRegistry {
fn udfs(&self) -> std::collections::HashSet<String> {
std::collections::HashSet::default()
}

fn udf(&self, name: &str) -> Result<Arc<datafusion_expr::ScalarUDF>> {
// Need to provide some placeholder codec because the stream may contain UDFs
// (using codec since with TaskContext we can't pass through unknown udfs; it
// requires registering them beforehand)
#[derive(Debug)]
struct PlaceholderLogicalExtensionCodec {}
impl LogicalExtensionCodec for PlaceholderLogicalExtensionCodec {
fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> Result<Arc<ScalarUDF>> {
Ok(Arc::new(create_udf(
name,
vec![],
arrow::datatypes::DataType::Null,
DataType::Null,
Volatility::Immutable,
Arc::new(|_| unimplemented!()),
)))
}

fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
fn try_decode_udaf(
&self,
name: &str,
_buf: &[u8],
) -> Result<Arc<AggregateUDF>> {
Ok(Arc::new(create_udaf(
name,
vec![arrow::datatypes::DataType::Null],
Arc::new(arrow::datatypes::DataType::Null),
vec![DataType::Null],
Arc::new(DataType::Null),
Volatility::Immutable,
Arc::new(|_| unimplemented!()),
Arc::new(vec![]),
)))
}

fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> Result<Arc<WindowUDF>> {
Ok(Arc::new(create_udwf(
name,
arrow::datatypes::DataType::Null,
Arc::new(arrow::datatypes::DataType::Null),
DataType::Null,
Arc::new(DataType::Null),
Volatility::Immutable,
Arc::new(|| unimplemented!()),
)))
}
fn register_udaf(
&mut self,
_udaf: Arc<AggregateUDF>,
) -> Result<Option<Arc<AggregateUDF>>> {
datafusion_common::internal_err!(
"register_udaf called in Placeholder Registry!"
)
}
fn register_udf(
&mut self,
_udf: Arc<datafusion_expr::ScalarUDF>,
) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
datafusion_common::internal_err!(
"register_udf called in Placeholder Registry!"
)
}
fn register_udwf(
&mut self,
_udaf: Arc<WindowUDF>,
) -> Result<Option<Arc<WindowUDF>>> {
datafusion_common::internal_err!(
"register_udwf called in Placeholder Registry!"
)

fn try_decode(
&self,
_buf: &[u8],
_inputs: &[LogicalPlan],
_ctx: &TaskContext,
) -> Result<Extension> {
unimplemented!()
}

fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
vec![]
fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
unimplemented!()
}

fn udafs(&self) -> std::collections::HashSet<String> {
std::collections::HashSet::default()
fn try_decode_table_provider(
&self,
_buf: &[u8],
_table_ref: &TableReference,
_schema: SchemaRef,
_ctx: &TaskContext,
) -> Result<Arc<dyn TableProvider>> {
unimplemented!()
}

fn udwfs(&self) -> std::collections::HashSet<String> {
std::collections::HashSet::default()
fn try_encode_table_provider(
&self,
_table_ref: &TableReference,
_node: Arc<dyn TableProvider>,
_buf: &mut Vec<u8>,
) -> Result<()> {
unimplemented!()
}
}
Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;

// Copied from from_bytes_with_ctx below but with placeholder registry instead of
// default.
{
let bytes: &[u8] = &bytes;
let protobuf = protobuf::LogicalExprNode::decode(bytes).map_err(|e| {
plan_datafusion_err!("Error decoding expr as protobuf: {e}")
})?;

let extension_codec = PlaceholderLogicalExtensionCodec {};
logical_plan::from_proto::parse_expr(
&protobuf,
&TaskContext::default(),
&extension_codec,
)
.map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))?;
}
Comment on lines +173 to +188
Copy link
Contributor Author

@Jefffrey Jefffrey Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit ugly, but it was because since we're now requiring TaskContext instead of dyn FunctionRegistry, there was no easy way within TaskContext to just pass through arbitrary UDFs/UDAFs/UDWFs like what PlaceHolderRegistry was meant to do. Instead, we achieve the same behaviour via the codec. So I'm copying the from_bytes_with_ctx code into here to customize the codec we pass in.


Ok(bytes)
}

fn from_bytes_with_registry(
bytes: &[u8],
registry: &dyn FunctionRegistry,
) -> Result<Self> {
fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self> {
let protobuf = protobuf::LogicalExprNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;

let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan::from_proto::parse_expr(&protobuf, registry, &extension_codec)
logical_plan::from_proto::parse_expr(&protobuf, ctx, &extension_codec)
.map_err(|e| plan_datafusion_err!("Error parsing protobuf into Expr: {e}"))
}
}
Expand Down
70 changes: 0 additions & 70 deletions datafusion/proto/src/bytes/registry.rs

This file was deleted.

Loading