-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-15367] [SQL] Add refreshTable back #13156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
b1cd1c6
bdd7c61
e3564d5
c3f3f0b
9e6c4b7
5f342f7
4ac3b76
7142ef5
8a52ac6
2b773b8
20d5055
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,4 +58,16 @@ class HiveContext private[hive]( | |
| sparkSession.sharedState.asInstanceOf[HiveSharedState] | ||
| } | ||
|
|
||
| /** | ||
| * Invalidate and refresh all the cached the metadata of the given table. For performance reasons, | ||
| * Spark SQL or the external data source library it uses might cache certain metadata about a | ||
| * table, such as the location of blocks. When those change outside of Spark SQL, users should | ||
| * call this function to invalidate the cache. | ||
| * | ||
| * @since 1.3.0 | ||
| */ | ||
| def refreshTable(tableName: String): Unit = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class is for the compatibility purpose. Let's leave it as is.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
| sparkSession.catalog.refreshTable(tableName) | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -20,12 +20,17 @@ package org.apache.spark.sql.hive | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.scalatest.BeforeAndAfterEach | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.{SparkContext, SparkFunSuite} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.{Row, SparkSession} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.sql.test.SQLTestUtils | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import org.apache.spark.util.Utils | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEach | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| with SQLTestUtils { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private var sc: SparkContext = null | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private var hc: HiveContext = null | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| protected var spark: SparkSession = _ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| override def beforeAll(): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| super.beforeAll() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -34,6 +39,7 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| sc.hadoopConfiguration.set(k, v) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| hc = new HiveContext(sc) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| spark = hc.sparkSession | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| override def afterEach(): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -99,4 +105,41 @@ class HiveContextCompatibilitySuite extends SparkFunSuite with BeforeAndAfterEac | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert(databases3.toSeq == Seq("default")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| test("check change after refresh") { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| test("REFRESH TABLE also needs to recache the data (data source tables)") { | |
| val tempPath: File = Utils.createTempDir() | |
| tempPath.delete() | |
| table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString) | |
| sql("DROP TABLE IF EXISTS refreshTable") | |
| sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") | |
| checkAnswer( | |
| table("refreshTable"), | |
| table("src").collect()) | |
| // Cache the table. | |
| sql("CACHE TABLE refreshTable") | |
| assertCached(table("refreshTable")) | |
| // Append new data. | |
| table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) | |
| // We are still using the old data. | |
| assertCached(table("refreshTable")) | |
| checkAnswer( | |
| table("refreshTable"), | |
| table("src").collect()) | |
| // Refresh the table. | |
| sql("REFRESH TABLE refreshTable") | |
| // We are using the new data. | |
| assertCached(table("refreshTable")) | |
| checkAnswer( | |
| table("refreshTable"), | |
| table("src").union(table("src")).collect()) | |
| // Drop the table and create it again. | |
| sql("DROP TABLE refreshTable") | |
| sparkSession.catalog.createExternalTable("refreshTable", tempPath.toString, "parquet") | |
| // It is not cached. | |
| assert(!isCached("refreshTable"), "refreshTable should not be cached.") | |
| // Refresh the table. REFRESH TABLE command should not make a uncached | |
| // table cached. | |
| sql("REFRESH TABLE refreshTable") | |
| checkAnswer( | |
| table("refreshTable"), | |
| table("src").union(table("src")).collect()) | |
| // It is not cached. | |
| assert(!isCached("refreshTable"), "refreshTable should not be cached.") | |
| sql("DROP TABLE refreshTable") | |
| Utils.deleteRecursively(tempPath) | |
| } |
Now, to test the pure HiveContext, the only way we can do is to add a test case in sql/hivecontext-compatibility.
Not sure if this can answer your question. Let me know if you have any concern
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They share the same implementation and I think we don't need to test all of them. cc @yhuai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. refreshTable API is in HiveContext. I think we can just do a dummy call to verify if the API still exists but does not check the functionalities. Does that sound good to you? @cloud-fan @yhuai
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like RefreshTable command is actually doing more work. I think we need to make RefreshTable and sparkSession.catalog.refreshTable have the same behavior. Can you make that change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do it soon. Then, the new behavior will be different from what Spark 1.6 behaves. However, I think we should keep two interfaces (SQL and API) consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Please review the latest code changes. Thanks!
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, let me remove it. : )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we don't call
refreshTablethroughSessionState, do we still need to keepSessionState.refreshTable?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I also want to remove
invalidateTable, which is a duplicate name ofrefreshTableThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, invalidateTable and refreshTable do have different meanings. The current implementation of
HiveMetastoreCatalog.refreshTableisHiveMetastoreCatalog.invalidateTable(and then we retrieve the new metadata lazily). But, it does not mean thatrefreshTableandinvalidateTablehave the same semantic. If we should remove any ofinvalidateTableorrefreshTableshould be discussed in a different thread.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks!