@@ -798,28 +798,48 @@ impl SessionContext {
798798 }
799799
800800 async fn create_function ( & self , stmt : CreateFunction ) -> Result < DataFrame > {
801- let function_factory = self . state . read ( ) . function_factory . clone ( ) ;
801+ let function = {
802+ let state = self . state . read ( ) . clone ( ) ;
803+ let function_factory = & state. function_factory ;
804+
805+ match function_factory {
806+ Some ( f) => f. create ( state. config ( ) , stmt) . await ?,
807+ _ => Err ( DataFusionError :: Configuration (
808+ "Function factory has not been configured" . into ( ) ,
809+ ) ) ?,
810+ }
811+ } ;
802812
803- match function_factory {
804- Some ( f) => f. create ( self . state . clone ( ) , stmt) . await ?,
805- None => Err ( DataFusionError :: Configuration (
806- "Function factory has not been configured" . into ( ) ,
807- ) ) ?,
813+ match function {
814+ RegisterFunction :: Scalar ( f) => {
815+ self . state . write ( ) . register_udf ( f) ?;
816+ }
817+ RegisterFunction :: Aggregate ( f) => {
818+ self . state . write ( ) . register_udaf ( f) ?;
819+ }
820+ RegisterFunction :: Window ( f) => {
821+ self . state . write ( ) . register_udwf ( f) ?;
822+ }
823+ RegisterFunction :: Table ( name, f) => self . register_udtf ( & name, f) ,
808824 } ;
809825
810826 self . return_empty_dataframe ( )
811827 }
812828
813829 async fn drop_function ( & self , stmt : DropFunction ) -> Result < DataFrame > {
814- let function_factory = self . state . read ( ) . function_factory . clone ( ) ;
815-
816- match function_factory {
817- Some ( f) => f. remove ( self . state . clone ( ) , stmt) . await ?,
818- None => Err ( DataFusionError :: Configuration (
819- "Function factory has not been configured" . into ( ) ,
820- ) ) ?,
830+ let _function = {
831+ let state = self . state . read ( ) . clone ( ) ;
832+ let function_factory = & state. function_factory ;
833+
834+ match function_factory {
835+ Some ( f) => f. remove ( state. config ( ) , stmt) . await ?,
836+ None => Err ( DataFusionError :: Configuration (
837+ "Function factory has not been configured" . into ( ) ,
838+ ) ) ?,
839+ }
821840 } ;
822841
842+ // TODO: Once we have unregister UDF we need to implement it here
823843 self . return_empty_dataframe ( )
824844 }
825845
@@ -1289,27 +1309,36 @@ impl QueryPlanner for DefaultQueryPlanner {
12891309/// ```
12901310#[ async_trait]
12911311pub trait FunctionFactory : Sync + Send {
1292- // TODO: I don't like having RwLock Leaking here, who ever implements it
1293- // has to depend ot `parking_lot`. I'f we expose &mut SessionState it
1294- // may keep lock of too long.
12951312 //
1296- // Not sure if there is better approach.
1313+ // This api holds a read lock for state
12971314 //
12981315
12991316 /// Handles creation of user defined function specified in [CreateFunction] statement
13001317 async fn create (
13011318 & self ,
1302- state : Arc < RwLock < SessionState > > ,
1319+ state : & SessionConfig ,
13031320 statement : CreateFunction ,
1304- ) -> Result < ( ) > ;
1321+ ) -> Result < RegisterFunction > ;
13051322
13061323 /// Drops user defined function from [SessionState]
13071324 // Naming it `drop`` would make more sense but its already occupied in rust
13081325 async fn remove (
13091326 & self ,
1310- state : Arc < RwLock < SessionState > > ,
1327+ state : & SessionConfig ,
13111328 statement : DropFunction ,
1312- ) -> Result < ( ) > ;
1329+ ) -> Result < RegisterFunction > ;
1330+ }
1331+
1332+ /// Type of function to create
1333+ pub enum RegisterFunction {
1334+ /// Scalar user defined function
1335+ Scalar ( Arc < ScalarUDF > ) ,
1336+ /// Aggregate user defined function
1337+ Aggregate ( Arc < AggregateUDF > ) ,
1338+ /// Window user defined function
1339+ Window ( Arc < WindowUDF > ) ,
1340+ /// Table user defined function
1341+ Table ( String , Arc < dyn TableFunctionImpl > ) ,
13131342}
13141343/// Execution context for registering data sources and executing queries.
13151344/// See [`SessionContext`] for a higher level API.
0 commit comments