Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ mod tests {
let plan = ctx.sql(&query).await?;
let plan = plan.into_optimized_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let plan2 = logical_plan_from_bytes(&bytes, &ctx)?;
let plan2 = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
let plan_formatted = format!("{}", plan.display_indent());
let plan2_formatted = format!("{}", plan2.display_indent());
assert_eq!(plan_formatted, plan2_formatted);
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ mod tests {
let plan = ctx.sql(&query).await?;
let plan = plan.into_optimized_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let plan2 = logical_plan_from_bytes(&bytes, &ctx)?;
let plan2 = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
let plan_formatted = format!("{}", plan.display_indent());
let plan2_formatted = format!("{}", plan2.display_indent());
assert_eq!(plan_formatted, plan2_formatted);
Expand Down
22 changes: 19 additions & 3 deletions datafusion/proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,31 @@ name = "datafusion_proto"
[features]
default = ["parquet"]
json = ["pbjson", "serde", "serde_json", "datafusion-proto-common/json"]
parquet = ["datafusion/parquet", "datafusion-common/parquet"]
avro = ["datafusion/avro", "datafusion-common/avro"]
parquet = ["datafusion-datasource-parquet", "datafusion-common/parquet", "datafusion/parquet"]
avro = ["datafusion-datasource-avro", "datafusion-common/avro"]

# Note to developers: do *not* add `datafusion` as a dependency in
# this crate. See https://github.com/apache/datafusion/issues/17713
# for additional information.

[dependencies]
arrow = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true, default-features = false }
datafusion-catalog = { workspace = true }
datafusion-catalog-listing = { workspace = true }
datafusion-common = { workspace = true }
datafusion-datasource = { workspace = true }
datafusion-datasource-arrow = { workspace = true }
datafusion-datasource-avro = { workspace = true, optional = true }
datafusion-datasource-csv = { workspace = true }
datafusion-datasource-json = { workspace = true }
datafusion-datasource-parquet = { workspace = true, optional = true }
datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions-table = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
datafusion-physical-plan = { workspace = true }
datafusion-proto-common = { workspace = true }
object_store = { workspace = true }
pbjson = { workspace = true, optional = true }
Expand Down
22 changes: 9 additions & 13 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::physical_plan::{
AsExecutionPlan, DefaultPhysicalExtensionCodec, PhysicalExtensionCodec,
};
use crate::protobuf;
use datafusion::execution::TaskContext;
use datafusion_common::{plan_datafusion_err, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::{
create_udaf, create_udf, create_udwf, AggregateUDF, Expr, LogicalPlan, Volatility,
WindowUDF,
Expand All @@ -37,10 +37,9 @@ use prost::{
use std::sync::Arc;

// Reexport Bytes which appears in the API
use datafusion::execution::registry::FunctionRegistry;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_execution::registry::FunctionRegistry;
use datafusion_expr::planner::ExprPlanner;
use datafusion_physical_plan::ExecutionPlan;

mod registry;

Expand Down Expand Up @@ -240,24 +239,21 @@ pub fn logical_plan_to_json_with_extension_codec(

/// Deserialize a LogicalPlan from JSON
#[cfg(feature = "json")]
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
pub fn logical_plan_from_json(json: &str, ctx: &TaskContext) -> Result<LogicalPlan> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_from_json_with_extension_codec(json, ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
pub fn logical_plan_from_bytes(
bytes: &[u8],
ctx: &SessionContext,
) -> Result<LogicalPlan> {
pub fn logical_plan_from_bytes(bytes: &[u8], ctx: &TaskContext) -> Result<LogicalPlan> {
let extension_codec = DefaultLogicalExtensionCodec {};
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
pub fn logical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &SessionContext,
ctx: &TaskContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let protobuf = protobuf::LogicalPlanNode::decode(bytes)
Expand All @@ -269,7 +265,7 @@ pub fn logical_plan_from_bytes_with_extension_codec(
#[cfg(feature = "json")]
pub fn logical_plan_from_json_with_extension_codec(
json: &str,
ctx: &SessionContext,
ctx: &TaskContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
Expand Down Expand Up @@ -312,12 +308,12 @@ pub fn physical_plan_to_bytes_with_extension_codec(
#[cfg(feature = "json")]
pub fn physical_plan_from_json(
json: &str,
ctx: &SessionContext,
ctx: &TaskContext,
) -> Result<Arc<dyn ExecutionPlan>> {
let back: protobuf::PhysicalPlanNode = serde_json::from_str(json)
.map_err(|e| plan_datafusion_err!("Error serializing plan: {e}"))?;
let extension_codec = DefaultPhysicalExtensionCodec {};
back.try_into_physical_plan(&ctx.task_ctx(), &extension_codec)
back.try_into_physical_plan(&ctx, &extension_codec)
}

/// Deserialize a PhysicalPlan from bytes
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/src/bytes/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use std::{collections::HashSet, sync::Arc};

use datafusion::execution::registry::FunctionRegistry;
use datafusion_common::plan_err;
use datafusion_common::Result;
use datafusion_execution::registry::FunctionRegistry;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};

Expand Down
6 changes: 3 additions & 3 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
//!
//! [`LogicalPlan`]: datafusion_expr::LogicalPlan
//! [`Expr`]: datafusion_expr::Expr
//! [`ExecutionPlan`]: datafusion::physical_plan::ExecutionPlan
//! [`PhysicalExpr`]: datafusion::physical_expr::PhysicalExpr
//! [`ExecutionPlan`]: datafusion_physical_plan::ExecutionPlan
//! [`PhysicalExpr`]: datafusion_physical_expr::PhysicalExpr
//!
//! Internally, this crate is implemented by converting the plans to [protocol
//! buffers] using [prost].
Expand Down Expand Up @@ -93,7 +93,7 @@
//! let bytes = logical_plan_to_bytes(&plan)?;
//!
//! // Decode bytes from somewhere (over network, etc.) back to LogicalPlan
//! let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
//! let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
//! assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
//! # Ok(())
//! # }
Expand Down
Loading
Loading