-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-33448][SQL] Support CACHE/UNCACHE TABLE commands for v2 tables #30403
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
Show resolved
Hide resolved
| // CACHE TABLE ... AS SELECT creates a temp view with the input query. | ||
| // Thus, use the identifier in UnresolvedTableOrView directly, | ||
| case CacheTable(u: UnresolvedTableOrView, plan, isLazy, options) if plan.isDefined => | ||
| CacheTableCommand(u.multipartIdentifier.asTableIdentifier, plan, isLazy, options) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Please let me know what you think about having UnresolvedTableOrView here to eagerly use the identifier if plan is defined. Another approach is to have a separate rule to handle CacheTable(u: UnresolvedTableOrView, ...).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more like CTAS and the table should be just Seq[String] not LogicalPlan.
How about we have both CacheTable(table: LogicalPlan, ...) and CacheTabeAsSelect(tempViewName: String, ...)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #131246 has finished for PR 30403 at commit
|
sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
Show resolved
Hide resolved
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
| * The logical plan for no-op command handling non-existing table. | ||
| */ | ||
| case class NoopDropTable(multipartIdentifier: Seq[String]) extends Command | ||
| case class NoopCommand(multipartIdentifier: Seq[String]) extends Command |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add a commandName: String property, which can be DROP TABLE, REFRESH TABLE, etc., so that we can see the original commannd name from the EXPLAIN result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added commandName. Now EXPLAIN EXTENDED DROP TABLE looks like the following:
|== Parsed Logical Plan ==
'DropTable true, false
+- 'UnresolvedTableOrView [testcat, ns1, ns2, tbl], true
== Analyzed Logical Plan ==
NoopCommand DROP TABLE, [testcat, ns1, ns2, tbl]
== Optimized Logical Plan ==
NoopCommand DROP TABLE, [testcat, ns1, ns2, tbl]
== Physical Plan ==
LocalTableScan <empty>
Btw, do we want to introduce NoopCommandExec for physical plan as well?
| throw new AnalysisException("SHOW CREATE TABLE is not supported for v2 tables.") | ||
|
|
||
| case CacheTable(_: ResolvedTable, _, _, _) => | ||
| throw new AnalysisException("CACHE TABLE is not supported for v2 tables.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need new v2 APIs to support it. This command touches CacheManager which is Spark internal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah OK. An existing bug, I guess? (it only supported temp view / v1 tables).
Does it make sense to match case CacheTable(_: ResolvedTable, _, _, _) in ResolveSessionCatalog (seems weird) or should we match it in DataSourceV2Strategy with a new CacheTableExec similar to CacheTableCommand?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a CacheTableExec as a v2 version of CacheTableCommand.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added v2 version.
| parseTempViewOrV1Table(tbl, "CACHE TABLE") | ||
| } | ||
| CacheTableCommand(name.asTableIdentifier, plan, isLazy, options) | ||
| // CACHE TABLE ... AS SELECT creates a temp view with the input query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the behavior of it if the temp view already exists? overwrite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would fail with:
org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException: Temporary view 't' already exists;
|
Test build #131261 has finished for PR 30403 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #131460 has finished for PR 30403 at commit
|
|
Kubernetes integration test starting |
|
Test build #131461 has finished for PR 30403 at commit
|
|
Kubernetes integration test status success |
|
cc @sunchao |
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
Outdated
Show resolved
Hide resolved
|
Test build #131793 has finished for PR 30403 at commit
|
sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
Show resolved
Hide resolved
|
Test build #131799 has finished for PR 30403 at commit
|
|
Test build #131804 has finished for PR 30403 at commit
|
|
Test build #131818 has finished for PR 30403 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
Show resolved
Hide resolved
| val multipartIdentifier = | ||
| sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) | ||
| val cascade = (multipartIdentifier.length <= 2) && | ||
| !sessionCatalog.isTemporaryTable(multipartIdentifier.asTableIdentifier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add an overload of isTemporaryTable that takes Seq[String]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the overload already exists as isTempView. :)
|
Test build #131825 has finished for PR 30403 at commit
|
|
Test build #131828 has finished for PR 30403 at commit
|
|
Test build #131888 has finished for PR 30403 at commit
|
| } | ||
|
|
||
| private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = { | ||
| private def testNotSupportedV2Command( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary change. This is minor and let's fix it in your next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, will fix.
|
thanks, merging to master! |
| import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper | ||
| import org.apache.spark.storage.StorageLevel | ||
|
|
||
| case class CacheTableCommand( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the next thing we can do is to refactor it using the v2 framework (not adding a v2 version). The benefits are: 1. moving the logical plan to catalyst. 2. resolve the table in the analyzer. e.g.
CacheTable(UnresolvedRelation(...), ...)
...
case class CacheTableExec(relation: LogicalPlan) {
def run() {
val df = Dataset.ofRows(spark, relation)
....
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue I am encountering by moving to the v2 framework (for v2 tables) is the following.
When CACHE TABLE testcat.tbl is run, tbl is changed from DataSourceV2Relation to DataSourceV2ScanRelation in V2ScanRelationPushDown rule, now that the plan goes thru analyzer, optimizer, etc. But, if I run spark.table("testcat.tbl"), the query execution has tbl as DataSourceV2Relation, thus cache is not applied.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah, one solution is to follow InsertIntoStatement and do not make the table as a child. Then we resolve UnresolvedRelation inside CacheTable manually in ResolveTempViews and other resolution rules.
### What changes were proposed in this pull request? This changes DSv2 refresh table semantics to also recache the target table itself. ### Why are the changes needed? Currently "REFRESH TABLE" in DSv2 only invalidate all caches referencing the table. With #30403 merged which adds support for caching a DSv2 table, we should also recache the target table itself to make the behavior consistent with DSv1. ### Does this PR introduce _any_ user-facing change? Yes, now refreshing table in DSv2 also recache the target table itself. ### How was this patch tested? Added coverage of this new behavior in the existing UT for v2 refresh table command Closes #30742 from sunchao/SPARK-33653. Authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? This changes DSv2 refresh table semantics to also recache the target table itself. ### Why are the changes needed? Currently "REFRESH TABLE" in DSv2 only invalidate all caches referencing the table. With apache#30403 merged which adds support for caching a DSv2 table, we should also recache the target table itself to make the behavior consistent with DSv1. ### Does this PR introduce _any_ user-facing change? Yes, now refreshing table in DSv2 also recache the target table itself. ### How was this patch tested? Added coverage of this new behavior in the existing UT for v2 refresh table command Closes apache#30742 from sunchao/SPARK-33653. Authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…itself This is a backport of #30742 for branch-3.1 ### What changes were proposed in this pull request? This changes DSv2 refresh table semantics to also recache the target table itself. ### Why are the changes needed? Currently "REFRESH TABLE" in DSv2 only invalidate all caches referencing the table. With #30403 merged which adds support for caching a DSv2 table, we should also recache the target table itself to make the behavior consistent with DSv1. ### Does this PR introduce _any_ user-facing change? Yes, now refreshing table in DSv2 also recache the target table itself. ### How was this patch tested? Added coverage of this new behavior in the existing UT for v2 refresh table command. Closes #30769 from sunchao/SPARK-33653-branch-3.1. Authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
This PR proposes to support
CHACHE/UNCACHE TABLEcommands for v2 tables.In addtion, this PR proposes to migrate
CACHE/UNCACHE TABLEto useUnresolvedTableOrViewto resolve the table identifier. This allows consistent resolution rules (temp view first, etc.) to be applied for both v1/v2 commands. More info about the consistent resolution rule proposal can be found in JIRA or proposal doc.Why are the changes needed?
To support
CACHE/UNCACHE TABLEcommands for v2 tables.Note that
CACHE/UNCACHE TABLEfor v1 tables/views go throughSparkSession.tableto resolve identifier, which resolves temp views first, so there is no change in the behavior by moving to the new framework.Does this PR introduce any user-facing change?
Yes. Now the user can run
CACHE/UNCACHE TABLEcommands on v2 tables.How was this patch tested?
Added/updated existing tests.