diff --git a/ballista/rust/core/src/serde/logical_plan/from_proto.rs b/ballista/rust/core/src/serde/logical_plan/from_proto.rs index 389c1bdb722e..10cc5e156889 100644 --- a/ballista/rust/core/src/serde/logical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/from_proto.rs @@ -1102,6 +1102,7 @@ impl TryInto for &protobuf::LogicalExprNode { } } ExprType::ScalarUdfProtoExpr(expr) => { + println!("from_proto to logical plan-----------------"); if let Some(udf_plugin_manager) = get_udf_plugin_manager("") { let fun = udf_plugin_manager .scalar_udfs @@ -1112,6 +1113,7 @@ impl TryInto for &protobuf::LogicalExprNode { expr.fun_name.to_string() )) })?; + println!("found udf-------------------"); let fun_arc = fun.clone(); let fun_args = &expr.args; let args: Vec = fun_args diff --git a/ballista/rust/core/src/serde/physical_plan/from_proto.rs b/ballista/rust/core/src/serde/physical_plan/from_proto.rs index 38c04ef29dd3..1e3052ff1adb 100644 --- a/ballista/rust/core/src/serde/physical_plan/from_proto.rs +++ b/ballista/rust/core/src/serde/physical_plan/from_proto.rs @@ -581,6 +581,7 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { } // argo engine add. ExprType::ScalarUdfProtoExpr(e) => { + println!("from proto to physical plan----------------"); if let Some(udf_plugin_manager) = get_udf_plugin_manager("") { let fun = udf_plugin_manager .scalar_udfs @@ -591,8 +592,8 @@ impl TryFrom<&protobuf::PhysicalExprNode> for Arc { &e.fun_name.to_owned() )) })?; - let scalar_udf = &*fun.clone(); + println!("found udf-------------------"); let args = e .expr .iter() diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 61da0d9cbf3e..9346f7df37a1 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -328,6 +328,7 @@ impl SchedulerGrpc for SchedulerServer { &self, request: Request, ) -> std::result::Result, tonic::Status> { + println!("execute query in scheduler--------------"); if let ExecuteQueryParams { query: Some(query), settings, @@ -347,6 +348,7 @@ impl SchedulerGrpc for SchedulerServer { let plan = match query { Query::LogicalPlan(logical_plan) => { // parse protobuf + println!("in ballista logical_plan to proto"); (&logical_plan).try_into().map_err(|e| { let msg = format!("Could not parse logical plan protobuf: {}", e); error!("{}", msg); diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 5e4e3ef6b711..066cc1b5809b 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -197,6 +197,7 @@ impl ExecutionContext { }; // register udf + println!("register udf to context---------------"); if let Some(udf_plugin_manager) = get_udf_plugin_manager(config.plugin_dir.as_str()) { @@ -212,6 +213,7 @@ impl ExecutionContext { context.register_udaf((**aggregate_udf).clone()) }); } + println!("register udf to context--------success-------"); context } diff --git a/datafusion/src/plugin/udf.rs b/datafusion/src/plugin/udf.rs index a906554956e5..eef32b231149 100644 --- a/datafusion/src/plugin/udf.rs +++ b/datafusion/src/plugin/udf.rs @@ -104,6 +104,7 @@ impl PluginRegistrar for UDFPluginManager { Ok(()) } })?; + self.libraries.push(library); Ok(()) } @@ -136,6 +137,7 @@ macro_rules! declare_udf_plugin { /// get a Option of Immutable UDFPluginManager pub fn get_udf_plugin_manager(path: &str) -> Option { + println!("get_udf_plugin_manager:{}", path); let udf_plugin_manager_opt = { let gpm = global_plugin_manager(path).lock().unwrap(); let plugin_registrar_opt = gpm.plugin_managers.get(&PluginEnum::UDF);