Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions datafusion-examples/examples/composed_extension_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use datafusion::common::Result;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::{DisplayAs, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion_proto::physical_plan::DecodeContext;
use datafusion_proto::physical_plan::{
AsExecutionPlan, ComposedPhysicalExtensionCodec, PhysicalExtensionCodec,
};
Expand Down Expand Up @@ -71,7 +72,7 @@ async fn main() {

// deserialize proto back to execution plan
let result_exec_plan: Arc<dyn ExecutionPlan> = proto
.try_into_physical_plan(&ctx.task_ctx(), &composed_codec)
.try_into_physical_plan(&DecodeContext::new(&ctx.task_ctx()), &composed_codec)
.expect("from proto");

// assert that the original and deserialized execution plans are equal
Expand Down Expand Up @@ -137,7 +138,7 @@ impl PhysicalExtensionCodec for ParentPhysicalExtensionCodec {
&self,
buf: &[u8],
inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_ctx: &DecodeContext,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf == "ParentExec".as_bytes() {
Ok(Arc::new(ParentExec {
Expand Down Expand Up @@ -213,7 +214,7 @@ impl PhysicalExtensionCodec for ChildPhysicalExtensionCodec {
&self,
buf: &[u8],
_inputs: &[Arc<dyn ExecutionPlan>],
_ctx: &TaskContext,
_ctx: &DecodeContext,
) -> Result<Arc<dyn ExecutionPlan>> {
if buf == "ChildExec".as_bytes() {
Ok(Arc::new(ChildExec {}))
Expand Down
7 changes: 4 additions & 3 deletions datafusion/ffi/src/plan_properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use datafusion_proto::{
physical_plan::{
from_proto::{parse_physical_sort_exprs, parse_protobuf_partitioning},
to_proto::{serialize_partitioning, serialize_physical_sort_exprs},
DefaultPhysicalExtensionCodec,
DecodeContext, DefaultPhysicalExtensionCodec,
},
protobuf::{Partitioning, PhysicalSortExprNodeCollection},
};
Expand Down Expand Up @@ -183,6 +183,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
let default_ctx = SessionContext::new();
let task_context = default_ctx.task_ctx();
let codex = DefaultPhysicalExtensionCodec {};
let decode_context = DecodeContext::new(&task_context);

let ffi_orderings = unsafe { (ffi_props.output_ordering)(&ffi_props) };

Expand All @@ -191,7 +192,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let sort_exprs = parse_physical_sort_exprs(
&proto_output_ordering.physical_sort_expr_nodes,
&task_context,
&decode_context,
&schema,
&codex,
)?;
Expand All @@ -203,7 +204,7 @@ impl TryFrom<FFI_PlanProperties> for PlanProperties {
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let partitioning = parse_protobuf_partitioning(
Some(&proto_output_partitioning),
&task_context,
&decode_context,
&schema,
&codex,
)?
Expand Down
7 changes: 4 additions & 3 deletions datafusion/ffi/src/udaf/accumulator_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_proto::{
physical_plan::{
from_proto::{parse_physical_exprs, parse_physical_sort_exprs},
to_proto::{serialize_physical_exprs, serialize_physical_sort_exprs},
DefaultPhysicalExtensionCodec,
DecodeContext, DefaultPhysicalExtensionCodec,
},
protobuf::PhysicalAggregateExprNode,
};
Expand Down Expand Up @@ -121,16 +121,17 @@ impl TryFrom<FFI_AccumulatorArgs> for ForeignAccumulatorArgs {

let default_ctx = SessionContext::new();
let task_ctx = default_ctx.task_ctx();
let decode_ctx = DecodeContext::new(&task_ctx);
let codex = DefaultPhysicalExtensionCodec {};

let order_bys = parse_physical_sort_exprs(
&proto_def.ordering_req,
&task_ctx,
&decode_ctx,
&schema,
&codex,
)?;

let exprs = parse_physical_exprs(&proto_def.expr, &task_ctx, &schema, &codex)?;
let exprs = parse_physical_exprs(&proto_def.expr, &decode_ctx, &schema, &codex)?;

Ok(Self {
return_field,
Expand Down
8 changes: 4 additions & 4 deletions datafusion/ffi/src/udwf/partition_evaluator_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_common::exec_datafusion_err;
use datafusion_proto::{
physical_plan::{
from_proto::parse_physical_expr, to_proto::serialize_physical_exprs,
DefaultPhysicalExtensionCodec,
DecodeContext, DefaultPhysicalExtensionCodec,
},
protobuf::PhysicalExprNode,
};
Expand Down Expand Up @@ -137,6 +137,8 @@ impl TryFrom<FFI_PartitionEvaluatorArgs> for ForeignPartitionEvaluatorArgs {

fn try_from(value: FFI_PartitionEvaluatorArgs) -> Result<Self> {
let default_ctx = SessionContext::new();
let task_ctx = default_ctx.task_ctx();
let decode_ctx = DecodeContext::new(&task_ctx);
let codec = DefaultPhysicalExtensionCodec {};

let schema: SchemaRef = value.schema.into();
Expand All @@ -148,9 +150,7 @@ impl TryFrom<FFI_PartitionEvaluatorArgs> for ForeignPartitionEvaluatorArgs {
.collect::<std::result::Result<Vec<_>, prost::DecodeError>>()
.map_err(|e| exec_datafusion_err!("Failed to decode PhysicalExprNode: {e}"))?
.iter()
.map(|expr_node| {
parse_physical_expr(expr_node, &default_ctx.task_ctx(), &schema, &codec)
})
.map(|expr_node| parse_physical_expr(expr_node, &decode_ctx, &schema, &codec))
.collect::<Result<Vec<_>>>()?;

let input_fields = input_exprs
Expand Down
8 changes: 8 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,14 @@ message PhysicalExprNode {

UnknownColumn unknown_column = 20;
}

// Optional ID for caching during deserialization. This is used for deduplication,
// so PhysicalExprs with the same ID will be deserialized as Arcs pointing to the
// same address (instead of distinct addresses) on the deserializing machine.
//
// We use the Arc pointer address during serialization as the ID, as this by default
// indicates if a PhysicalExpr is identical to another on the serializing machine.
optional uint64 id = 21;
}

message PhysicalScalarUdfNode {
Expand Down
8 changes: 5 additions & 3 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::logical_plan::{
self, AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
use crate::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
AsExecutionPlan, DecodeContext, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
use crate::protobuf;
use datafusion_common::{plan_datafusion_err, Result};
Expand Down Expand Up @@ -313,7 +313,8 @@ pub fn physical_plan_from_json(
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
back.try_into_physical_plan(&ctx, &extension_codec)
let decode_ctx = DecodeContext::new(ctx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should decode_ctx be method parameter rather than created here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comet is added for consistency with other public methods expecting DecodeCtx but I'm not sure we should expose &DecodeContext in public methods, explanation in following comment

back.try_into_physical_plan(&decode_ctx, &extension_codec)
}

/// Deserialize a PhysicalPlan from bytes
Expand All @@ -333,5 +334,6 @@ pub fn physical_plan_from_bytes_with_extension_codec(
) -> Result<Arc<dyn ExecutionPlan>> {
let protobuf = protobuf::PhysicalPlanNode::decode(bytes)
.map_err(|e| plan_datafusion_err!("Error decoding expr as protobuf: {e}"))?;
protobuf.try_into_physical_plan(ctx, extension_codec)
let decode_ctx = DecodeContext::new(ctx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should decode_ctx be method parameter rather than created here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comet is added for consistency with other public methods expecting DecodeCtx but I'm not sure we should expose &DecodeContext in public methods, explanation in following comment

protobuf.try_into_physical_plan(&decode_ctx, extension_codec)
}
21 changes: 21 additions & 0 deletions datafusion/proto/src/generated/pbjson.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions datafusion/proto/src/generated/prost.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading