-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-31999][SQL] Add REFRESH FUNCTION command #28840
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 45 commits
69a47a1
a95dcb6
3fc807e
b282348
f677a4a
a6c5d8b
de54470
9e09875
63695c0
9e9d5ce
c434821
35fd44b
f83fd8b
e444943
afd510b
1241bde
93f5d71
dc684b5
0ea7dd6
643969c
6cb2edd
cffc207
4b6408d
5d5fe71
4ba345b
6765395
dc86b82
a38d656
cdea55b
5e227d7
703ad47
a79f72b
a4d144a
3bd8d23
60ac2a0
b36b760
c5937a2
56ec5ea
c129a54
a956144
711656d
5d4c152
94fa132
fc4789f
e83194f
b18437c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| --- | ||
| layout: global | ||
| title: REFRESH FUNCTION | ||
| displayTitle: REFRESH FUNCTION | ||
| license: | | ||
| Licensed to the Apache Software Foundation (ASF) under one or more | ||
| contributor license agreements. See the NOTICE file distributed with | ||
| this work for additional information regarding copyright ownership. | ||
| The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| (the "License"); you may not use this file except in compliance with | ||
| the License. You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| --- | ||
|
|
||
| ### Description | ||
|
|
||
| `REFRESH FUNCTION` statement invalidates the cached function entry, which includes a class name | ||
| and resource location of the given function. The invalidated cache is populated right away. | ||
| Note that `REFRESH FUNCTION` only works for permanent functions. Refreshing native functions or temporary functions will cause an exception. | ||
|
|
||
| ### Syntax | ||
|
|
||
| ```sql | ||
| REFRESH FUNCTION function_identifier | ||
| ``` | ||
|
|
||
| ### Parameters | ||
|
|
||
| * **function_identifier** | ||
|
|
||
| Specifies a function name, which is either a qualified or unqualified name. If no database identifier is provided, uses the current database. | ||
|
|
||
| **Syntax:** `[ database_name. ] function_name` | ||
|
|
||
| ### Examples | ||
|
|
||
| ```sql | ||
| -- The cached entry of the function will be refreshed | ||
| -- The function is resolved from the current database as the function name is unqualified. | ||
| REFRESH FUNCTION func1; | ||
|
|
||
| -- The cached entry of the function will be refreshed | ||
| -- The function is resolved from tempDB database as the function name is qualified. | ||
| REFRESH FUNCTION db1.func1; | ||
| ``` | ||
|
|
||
| ### Related Statements | ||
|
|
||
| * [CACHE TABLE](sql-ref-syntax-aux-cache-cache-table.html) | ||
| * [CLEAR CACHE](sql-ref-syntax-aux-cache-clear-cache.html) | ||
| * [UNCACHE TABLE](sql-ref-syntax-aux-cache-uncache-table.html) | ||
| * [REFRESH TABLE](sql-ref-syntax-aux-cache-refresh-table.html) | ||
| * [REFRESH](sql-ref-syntax-aux-cache-refresh.html) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.connector.catalog | |
|
|
||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.AnalysisException | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} | ||
| import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} | ||
|
|
||
| /** | ||
|
|
@@ -155,4 +155,31 @@ private[sql] trait LookupCatalog extends Logging { | |
| None | ||
| } | ||
| } | ||
|
|
||
| // TODO: move function related v2 statements to the new framework. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @imback82 do you have time to work on this TODO?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, I will get to it once this PR is merged.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Created #29198 |
||
| def parseSessionCatalogFunctionIdentifier( | ||
| nameParts: Seq[String], | ||
| sql: String): FunctionIdentifier = { | ||
| if (nameParts.length == 1 && catalogManager.v1SessionCatalog.isTempFunction(nameParts.head)) { | ||
| return FunctionIdentifier(nameParts.head) | ||
| } | ||
|
|
||
| nameParts match { | ||
| case SessionCatalogAndIdentifier(_, ident) => | ||
| if (nameParts.length == 1) { | ||
| // If there is only one name part, it means the current catalog is the session catalog. | ||
| // Here we don't fill the default database, to keep the error message unchanged for | ||
| // v1 commands. | ||
| FunctionIdentifier(nameParts.head, None) | ||
| } else { | ||
| ident.namespace match { | ||
| case Array(db) => FunctionIdentifier(ident.name, Some(db)) | ||
| case _ => | ||
| throw new AnalysisException(s"Unsupported function name '$ident'") | ||
| } | ||
| } | ||
|
|
||
| case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -611,33 +611,11 @@ class ResolveSessionCatalog( | |
| CreateFunctionCommand(database, function, className, resources, isTemp, ignoreIfExists, | ||
| replace) | ||
| } | ||
| } | ||
|
|
||
| // TODO: move function related v2 statements to the new framework. | ||
| private def parseSessionCatalogFunctionIdentifier( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this method to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR needs the change? |
||
| nameParts: Seq[String], | ||
| sql: String): FunctionIdentifier = { | ||
| if (nameParts.length == 1 && isTempFunction(nameParts.head)) { | ||
| return FunctionIdentifier(nameParts.head) | ||
| } | ||
|
|
||
| nameParts match { | ||
| case SessionCatalogAndIdentifier(_, ident) => | ||
| if (nameParts.length == 1) { | ||
| // If there is only one name part, it means the current catalog is the session catalog. | ||
| // Here we don't fill the default database, to keep the error message unchanged for | ||
| // v1 commands. | ||
| FunctionIdentifier(nameParts.head, None) | ||
| } else { | ||
| ident.namespace match { | ||
| case Array(db) => FunctionIdentifier(ident.name, Some(db)) | ||
| case _ => | ||
| throw new AnalysisException(s"Unsupported function name '$ident'") | ||
| } | ||
| } | ||
|
|
||
| case _ => throw new AnalysisException(s"$sql is only supported in v1 catalog") | ||
| } | ||
| case RefreshFunction(ResolvedFunc(identifier)) => | ||
| // Fallback to v1 command | ||
| val funcIdentifier = identifier.asFunctionIdentifier | ||
| RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName) | ||
| } | ||
|
|
||
| private def parseV1Table(tableName: Seq[String], sql: String): Seq[String] = tableName match { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -236,6 +236,46 @@ case class ShowFunctionsCommand( | |
| } | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * A command for users to refresh the persistent function. | ||
| * The syntax of using this command in SQL is: | ||
| * {{{ | ||
| * REFRESH FUNCTION functionName | ||
| * }}} | ||
| */ | ||
| case class RefreshFunctionCommand( | ||
| databaseName: Option[String], | ||
| functionName: String) | ||
| extends RunnableCommand { | ||
|
|
||
| override def run(sparkSession: SparkSession): Seq[Row] = { | ||
| val catalog = sparkSession.sessionState.catalog | ||
| if (FunctionRegistry.builtin.functionExists(FunctionIdentifier(functionName))) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We still can create persistent function with the same name as the built-in function. For example, CREATE FUNCTION rand AS 'org.apache.spark.sql.catalyst.expressions.Abs'
DESC function default.randI think we should still allow this case.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems no meaning to refresh a persistent function whose name is same as a built-in function. Yes, we can create a persistent function with the same name as the built-in function, but just create in metastore. The actual function we used is the built-in function. The reason is built-in functions are pre-cached in registry and we lookup cached function first. e.g., BTW, maybe it's the reason why we create function and load it lazy that just be a Hive client, otherwise we can't create such function like
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about I think this is similar to table and temp views. Spark will try to look up temp view first, so if the name conflicts, temp view is preferred. But users can still use a qualified table name to read the table explicitly.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. Missed qualified name case, I will fix this in followup. |
||
| throw new AnalysisException(s"Cannot refresh builtin function $functionName") | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: built-in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. get it. |
||
| } | ||
| if (catalog.isTemporaryFunction(FunctionIdentifier(functionName, databaseName))) { | ||
| throw new AnalysisException(s"Cannot refresh temporary function $functionName") | ||
| } | ||
|
|
||
| val identifier = FunctionIdentifier( | ||
| functionName, Some(databaseName.getOrElse(catalog.getCurrentDatabase))) | ||
| // we only refresh the permanent function. | ||
| if (catalog.isPersistentFunction(identifier)) { | ||
| // register overwrite function. | ||
| val func = catalog.getFunctionMetadata(identifier) | ||
| catalog.registerFunction(func, true) | ||
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
cloud-fan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } else { | ||
| // clear cached function, if not exists throw exception | ||
| if (!catalog.unregisterFunction(identifier)) { | ||
| throw new NoSuchFunctionException(identifier.database.get, identifier.funcName) | ||
|
||
| } | ||
| } | ||
|
|
||
| Seq.empty[Row] | ||
| } | ||
| } | ||
|
|
||
| object FunctionsCommand { | ||
| // operators that do not have corresponding functions. | ||
| // They should be handled `DescribeFunctionCommand`, `ShowFunctionsCommand` | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.