Skip to content

Commit 39fd469

Browse files
gatorsmilecloud-fan
authored andcommitted
[SPARK-15367][SQL] Add refreshTable back
#### What changes were proposed in this pull request? `refreshTable` was a method in `HiveContext`. It was deleted accidentally while we were migrating the APIs. This PR is to add it back to `HiveContext`. In addition, in `SparkSession`, we put it under the catalog namespace (`SparkSession.catalog.refreshTable`). #### How was this patch tested? Changed the existing test cases to use the function `refreshTable`. Also added a test case for refreshTable in `hivecontext-compatibility` Author: gatorsmile <[email protected]> Closes #13156 from gatorsmile/refreshTable.
1 parent c94b34e commit 39fd469

7 files changed

Lines changed: 59 additions & 26 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,17 @@ abstract class Catalog {
211211
*/
212212
def clearCache(): Unit
213213

214+
/**
215+
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
216+
* Spark SQL or the external data source library it uses might cache certain metadata about a
217+
* table, such as the location of blocks. When those change outside of Spark SQL, users should
218+
* call this function to invalidate the cache.
219+
*
220+
* If this table is cached as an InMemoryRelation, drop the original cached version and make the
221+
* new version cached lazily.
222+
*
223+
* @since 2.0.0
224+
*/
225+
def refreshTable(tableName: String): Unit
226+
214227
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -126,24 +126,9 @@ case class RefreshTable(tableIdent: TableIdentifier)
126126
extends RunnableCommand {
127127

128128
override def run(sparkSession: SparkSession): Seq[Row] = {
129-
// Refresh the given table's metadata first.
130-
sparkSession.sessionState.catalog.refreshTable(tableIdent)
131-
132-
// If this table is cached as a InMemoryColumnarRelation, drop the original
133-
// cached version and make the new version cached lazily.
134-
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
135-
// Use lookupCachedData directly since RefreshTable also takes databaseName.
136-
val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
137-
if (isCached) {
138-
// Create a data frame to represent the table.
139-
// TODO: Use uncacheTable once it supports database name.
140-
val df = Dataset.ofRows(sparkSession, logicalPlan)
141-
// Uncache the logicalPlan.
142-
sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
143-
// Cache it again.
144-
sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
145-
}
146-
129+
// Refresh the given table's metadata. If this table is cached as an InMemoryRelation,
130+
// drop the original cached version and make the new version cached lazily.
131+
sparkSession.catalog.refreshTable(tableIdent.quotedString)
147132
Seq.empty[Row]
148133
}
149134
}

sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,33 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
345345
sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
346346
}
347347

348+
/**
349+
* Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
350+
* is refreshed.
351+
*
352+
* @group cachemgmt
353+
* @since 2.0.0
354+
*/
355+
override def refreshTable(tableName: String): Unit = {
356+
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
357+
sessionCatalog.refreshTable(tableIdent)
358+
359+
// If this table is cached as a InMemoryRelation, drop the original
360+
// cached version and make the new version cached lazily.
361+
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
362+
// Use lookupCachedData directly since RefreshTable also takes databaseName.
363+
val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
364+
if (isCached) {
365+
// Create a data frame to represent the table.
366+
// TODO: Use uncacheTable once it supports database name.
367+
val df = Dataset.ofRows(sparkSession, logicalPlan)
368+
// Uncache the logicalPlan.
369+
sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
370+
// Cache it again.
371+
sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
372+
}
373+
}
374+
348375
}
349376

350377

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
162162

163163
def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)
164164

165-
def refreshTable(tableName: String): Unit = {
166-
catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
167-
}
168-
169165
def invalidateTable(tableName: String): Unit = {
170166
catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
171167
}

sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
622622
.mode(SaveMode.Append)
623623
.saveAsTable("arrayInParquet")
624624

625-
sessionState.refreshTable("arrayInParquet")
625+
sparkSession.catalog.refreshTable("arrayInParquet")
626626

627627
checkAnswer(
628628
sql("SELECT a FROM arrayInParquet"),
@@ -681,7 +681,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
681681
.mode(SaveMode.Append)
682682
.saveAsTable("mapInParquet")
683683

684-
sessionState.refreshTable("mapInParquet")
684+
sparkSession.catalog.refreshTable("mapInParquet")
685685

686686
checkAnswer(
687687
sql("SELECT a FROM mapInParquet"),

sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
217217

218218
df.write.parquet(s"$path/p=2")
219219
sql("ALTER TABLE t ADD PARTITION (p=2)")
220-
hiveContext.sessionState.refreshTable("t")
220+
spark.catalog.refreshTable("t")
221221
checkAnswer(
222222
spark.table("t"),
223223
df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
@@ -249,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
249249

250250
df.write.parquet(s"$path/p=2")
251251
sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)")
252-
hiveContext.sessionState.refreshTable(s"$db.t")
252+
spark.catalog.refreshTable(s"$db.t")
253253
checkAnswer(
254254
spark.table(s"$db.t"),
255255
df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))

sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,16 @@ class HiveContext private[hive](
5858
sparkSession.sharedState.asInstanceOf[HiveSharedState]
5959
}
6060

61+
/**
62+
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
63+
* Spark SQL or the external data source library it uses might cache certain metadata about a
64+
* table, such as the location of blocks. When those change outside of Spark SQL, users should
65+
* call this function to invalidate the cache.
66+
*
67+
* @since 1.3.0
68+
*/
69+
def refreshTable(tableName: String): Unit = {
70+
sparkSession.catalog.refreshTable(tableName)
71+
}
72+
6173
}

0 commit comments

Comments
 (0)