Skip to content

Commit 0978ff7

Browse files
committed
[SPARK-33653][SQL] DSv2: REFRESH TABLE should recache the table itself
### What changes were proposed in this pull request? This changes DSv2 refresh table semantics to also recache the target table itself. ### Why are the changes needed? Currently "REFRESH TABLE" in DSv2 only invalidate all caches referencing the table. With apache#30403 merged which adds support for caching a DSv2 table, we should also recache the target table itself to make the behavior consistent with DSv1. ### Does this PR introduce _any_ user-facing change? Yes, now refreshing table in DSv2 also recache the target table itself. ### How was this patch tested? Added coverage of this new behavior in the existing UT for v2 refresh table command Closes apache#30742 from sunchao/SPARK-33653. Authored-by: Chao Sun <sunchao@apple.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 010aeca commit 0978ff7

3 files changed

Lines changed: 32 additions & 4 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2
1919

2020
import scala.collection.JavaConverters._
2121

22-
import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
22+
import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession, Strategy}
2323
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
2424
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
2525
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -56,9 +56,19 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
5656
session.sharedState.cacheManager.recacheByPlan(session, r)
5757
}
5858

59-
private def invalidateCache(r: ResolvedTable)(): Unit = {
59+
private def invalidateCache(r: ResolvedTable, recacheTable: Boolean = false)(): Unit = {
6060
val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier))
61+
val cache = session.sharedState.cacheManager.lookupCachedData(v2Relation)
6162
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
63+
if (recacheTable && cache.isDefined) {
64+
// save the cache name and cache level for recreation
65+
val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
66+
val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
67+
68+
// recache with the same name and cache level.
69+
val ds = Dataset.ofRows(session, v2Relation)
70+
session.sharedState.cacheManager.cacheQuery(ds, cacheName, cacheLevel)
71+
}
6272
}
6373

6474
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -137,7 +147,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
137147
}
138148

139149
case RefreshTable(r: ResolvedTable) =>
140-
RefreshTableExec(r.catalog, r.identifier, invalidateCache(r)) :: Nil
150+
RefreshTableExec(r.catalog, r.identifier, invalidateCache(r, recacheTable = true)) :: Nil
141151

142152
case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
143153
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ case class RefreshTableExec(
2929
catalog.invalidateTable(ident)
3030

3131
// invalidate all caches referencing the given table
32-
// TODO(SPARK-33437): re-cache the table itself once we support caching a DSv2 table
3332
invalidateCache()
3433

3534
Seq.empty

sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1748,6 +1748,25 @@ class DataSourceV2SQLSuite
17481748
}
17491749
}
17501750

1751+
test("SPARK-33653: REFRESH TABLE should recache the target table itself") {
1752+
val tblName = "testcat.ns.t"
1753+
withTable(tblName) {
1754+
sql(s"CREATE TABLE $tblName (id bigint) USING foo")
1755+
1756+
// if the table is not cached, refreshing it should not recache it
1757+
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isEmpty)
1758+
sql(s"REFRESH TABLE $tblName")
1759+
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isEmpty)
1760+
1761+
sql(s"CACHE TABLE $tblName")
1762+
1763+
// after caching & refreshing the table should be recached
1764+
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isDefined)
1765+
sql(s"REFRESH TABLE $tblName")
1766+
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(tblName)).isDefined)
1767+
}
1768+
}
1769+
17511770
test("REPLACE TABLE: v1 table") {
17521771
val e = intercept[AnalysisException] {
17531772
sql(s"CREATE OR REPLACE TABLE tbl (a int) USING ${classOf[SimpleScanSource].getName}")

0 commit comments

Comments
 (0)