Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,17 @@ abstract class Catalog {
*/
def clearCache(): Unit

/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
* table, such as the location of blocks. When those change outside of Spark SQL, users should
* call this function to invalidate the cache.
*
* If this table is cached as an InMemoryRelation, drop the original cached version and make the
* new version cached lazily.
*
* @since 2.0.0
*/
def refreshTable(tableName: String): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -126,24 +126,9 @@ case class RefreshTable(tableIdent: TableIdentifier)
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
// Refresh the given table's metadata first.
sparkSession.sessionState.catalog.refreshTable(tableIdent)

// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = Dataset.ofRows(sparkSession, logicalPlan)
// Uncache the logicalPlan.
sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
}

// Refresh the given table's metadata. If this table is cached as an InMemoryRelation,
// drop the original cached version and make the new version cached lazily.
sparkSession.catalog.refreshTable(tableIdent.quotedString)
Seq.empty[Row]
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,32 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
}

/**
* Refresh the cache entry for a metastore table, if any.
*
* @group cachemgmt
* @since 2.0.0
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
sessionCatalog.refreshTable(tableIdent)

// If this table is cached as a InMemoryRelation, drop the original
// cached version and make the new version cached lazily.
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = Dataset.ofRows(sparkSession, logicalPlan)
// Uncache the logicalPlan.
sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
}
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ private[sql] class SessionState(sparkSession: SparkSession) {
def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)

def refreshTable(tableName: String): Unit = {
// Different from SparkSession.catalog.refreshTable, this API only refreshes the metadata.
// It does not reload the cached data. That means, if this table is cached as
// an InMemoryRelation, we do not refresh the cached data.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SharedState can refresh the cached table data. In SessionState, we only can refresh the metadata. Thus, this API refreshTable only refresh the metadata

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if we need to remove the API refreshTable in SessionState. So far, it is not being used by any test case. Thanks!

Copy link
Contributor

@andrewor14 andrewor14 May 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile this is super confusing, the fact that spark.catalog.refreshTable and spark.sessionState.refreshTable do different things. Should we just rename this to invalidateTable along with HiveMetastoreCatalog.refreshTable?

catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet")

sessionState.refreshTable("arrayInParquet")
sparkSession.catalog.refreshTable("arrayInParquet")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we don't call refreshTable through SessionState, do we still need to keep SessionState.refreshTable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I also want to remove invalidateTable, which is a duplicate name of refreshTable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, invalidateTable and refreshTable do have different meanings. The current implementation of HiveMetastoreCatalog.refreshTable is HiveMetastoreCatalog.invalidateTable (and then we retrieve the new metadata lazily). But, it does not mean that refreshTable and invalidateTable have the same semantic. If we should remove any of invalidateTable or refreshTable should be discussed in a different thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks!


checkAnswer(
sql("SELECT a FROM arrayInParquet"),
Expand Down Expand Up @@ -681,7 +681,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.mode(SaveMode.Append)
.saveAsTable("mapInParquet")

sessionState.refreshTable("mapInParquet")
sparkSession.catalog.refreshTable("mapInParquet")

checkAnswer(
sql("SELECT a FROM mapInParquet"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle

df.write.parquet(s"$path/p=2")
sql("ALTER TABLE t ADD PARTITION (p=2)")
hiveContext.sessionState.refreshTable("t")
spark.catalog.refreshTable("t")
checkAnswer(
spark.table("t"),
df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
Expand Down Expand Up @@ -249,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle

df.write.parquet(s"$path/p=2")
sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)")
hiveContext.sessionState.refreshTable(s"$db.t")
spark.catalog.refreshTable(s"$db.t")
checkAnswer(
spark.table(s"$db.t"),
df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
Expand Down
7 changes: 7 additions & 0 deletions sql/hivecontext-compatibility/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,16 @@ class HiveContext private[hive](
sparkSession.sharedState.asInstanceOf[HiveSharedState]
}

/**
* Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
* Spark SQL or the external data source library it uses might cache certain metadata about a
* table, such as the location of blocks. When those change outside of Spark SQL, users should
* call this function to invalidate the cache.
*
* @since 1.3.0
*/
def refreshTable(tableName: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if invalidateTable has different meaning than refreshTable, should we also add it to HiveContext? cc @yhuai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is for the compatibility purpose. Let's leave it as is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

sparkSession.catalog.refreshTable(tableName)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ package org.apache.spark.sql.hive
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkContext, SparkFunSuite}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.Utils


class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach {
class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach
with SQLTestUtils {

private var sc: SparkContext = null
private var hc: HiveContext = null
protected var spark: SparkSession = _

override def beforeAll(): Unit = {
super.beforeAll()
Expand All @@ -34,6 +39,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac
sc.hadoopConfiguration.set(k, v)
}
hc = new HiveContext(sc)
spark = hc.sparkSession
}

override def afterEach(): Unit = {
Expand Down Expand Up @@ -99,4 +105,41 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac
assert(databases3.toSeq == Seq("default"))
}

test("check change after refresh") {
Copy link
Contributor

@cloud-fan cloud-fan May 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have test for refreshTable/RefreshTable command before?

Copy link
Member Author

@gatorsmile gatorsmile May 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test cases modified by this PR are used to verify refreshTable APIs. (in MultiDatabaseSuite.scala and MetastoreDataSourcesSuite.scala) We also have test cases to verify the corresponding SQL interface, which is calling RefreshTable Command. For example,

test("REFRESH TABLE also needs to recache the data (data source tables)") {
val tempPath: File = Utils.createTempDir()
tempPath.delete()
table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
sql("DROP TABLE IF EXISTS refreshTable")
sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
checkAnswer(
table("refreshTable"),
table("src").collect())
// Cache the table.
sql("CACHE TABLE refreshTable")
assertCached(table("refreshTable"))
// Append new data.
table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
// We are still using the old data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").collect())
// Refresh the table.
sql("REFRESH TABLE refreshTable")
// We are using the new data.
assertCached(table("refreshTable"))
checkAnswer(
table("refreshTable"),
table("src").union(table("src")).collect())
// Drop the table and create it again.
sql("DROP TABLE refreshTable")
sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet")
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
// Refresh the table. REFRESH TABLE command should not make a uncached
// table cached.
sql("REFRESH TABLE refreshTable")
checkAnswer(
table("refreshTable"),
table("src").union(table("src")).collect())
// It is not cached.
assert(!isCached("refreshTable"), "refreshTable should not be cached.")
sql("DROP TABLE refreshTable")
Utils.deleteRecursively(tempPath)
}

Now, to test the pure HiveContext, the only way we can do is to add a test case in sql/hivecontext-compatibility.

Not sure if this can answer your question. Let me know if you have any concern

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They share the same implementation and I think we don't need to test all of them. cc @yhuai

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. refreshTable API is in HiveContext. I think we can just do a dummy call to verify if the API still exists but does not check the functionalities. Does that sound good to you? @cloud-fan @yhuai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like RefreshTable command is actually doing more work. I think we need to make RefreshTable and sparkSession.catalog.refreshTable have the same behavior. Can you make that change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do it soon. Then, the new behavior will be different from what Spark 1.6 behaves. However, I think we should keep two interfaces (SQL and API) consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Please review the latest code changes. Thanks!

val _hc = hc
import _hc.implicits._

withTempPath { tempDir =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, let me remove it. : )

withTable("jsonTable") {
(("a", "b") :: Nil).toDF().toJSON.rdd.saveAsTextFile(tempDir.getCanonicalPath)

hc.sql(
s"""
|CREATE TABLE jsonTable
|USING org.apache.spark.sql.json
|OPTIONS (
| path '${tempDir.getCanonicalPath}'
|)
""".stripMargin)

assert(
hc.sql("SELECT * FROM jsonTable").collect().toSeq == Row("a", "b") :: Nil)

Utils.deleteRecursively(tempDir)
(("a1", "b1", "c1") :: Nil).toDF().toJSON.rdd.saveAsTextFile(tempDir.getCanonicalPath)

// Schema is cached so the new column does not show. The updated values in existing columns
// will show.
assert(
hc.sql("SELECT * FROM jsonTable").collect().toSeq == Row("a1", "b1") :: Nil)

hc.refreshTable("jsonTable")

// Check that the refresh worked
assert(
hc.sql("SELECT * FROM jsonTable").collect().toSeq == Row("a1", "b1", "c1") :: Nil)
}
}
}

}