-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18703] [SQL] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables #16134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
8773c03
34340ee
365ea26
1ed228f
74a676c
16c9da3
5aca883
9da3951
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,8 @@ import java.net.URI | |
| import java.text.SimpleDateFormat | ||
| import java.util.{Date, Locale, Random} | ||
|
|
||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.{FileSystem, Path} | ||
| import org.apache.hadoop.hive.common.FileUtils | ||
|
|
@@ -85,6 +87,7 @@ case class InsertIntoHiveTable( | |
| def output: Seq[Attribute] = Seq.empty | ||
|
|
||
| val hadoopConf = sessionState.newHadoopConf() | ||
| val createdTempDir = new scala.collection.mutable.ArrayBuffer[Path] | ||
| val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") | ||
|
|
||
| private def executionId: String = { | ||
|
|
@@ -111,6 +114,7 @@ case class InsertIntoHiveTable( | |
| if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { | ||
| throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") | ||
| } | ||
| createdTempDir += dir | ||
| fs.deleteOnExit(dir) | ||
| } catch { | ||
| case e: IOException => | ||
|
|
@@ -328,6 +332,15 @@ case class InsertIntoHiveTable( | |
| holdDDLTime) | ||
| } | ||
|
|
||
| // Attempt to delete the staging directory and the inclusive files. If failed, the files are | ||
| // expected to be dropped at the normal termination of VM since deleteOnExit is used. | ||
| try { | ||
| createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we just delete the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Through the call of getExternalTmpPath, Here, we want to drop the whole staging directory. For example, At the initial fix, I used |
||
| } catch { | ||
| case NonFatal(e) => | ||
| logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) | ||
| } | ||
|
|
||
| // Invalidate the cache. | ||
| sqlContext.sharedState.cacheManager.invalidateCache(table) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we delete the staging files before or after the invalidateCache? does it matter? logically, we should invalid cache first, then remove the intermediate dataset s.t the cache can be recovered from the file from disks. am i right? please clarify?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this does not matter. We do not reuse the temporary files in our implementation. |
||
| sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -166,6 +166,29 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef | |
| sql("DROP TABLE tmp_table") | ||
| } | ||
|
|
||
| test("Delete the temporary staging directory and files after each insert") { | ||
|
||
| withTable("tab") { | ||
| val tmpDir = Utils.createTempDir() | ||
|
||
| sql( | ||
| s""" | ||
| |CREATE TABLE tab(c1 string) | ||
| |location '${tmpDir.toURI.toString}' | ||
| """.stripMargin) | ||
|
|
||
| (1 to 3).map { i => | ||
| sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'") | ||
| } | ||
| def listFiles(path: File): List[String] = { | ||
| val dir = path.listFiles() | ||
| val folders = dir.filter(_.isDirectory).toList | ||
| val filePaths = dir.map(_.getName).toList | ||
| filePaths ::: folders.flatMap(listFiles) | ||
| } | ||
| val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil | ||
| assert(listFiles(tmpDir).sortBy(_.toString) == expectedFiles) | ||
| } | ||
| } | ||
|
|
||
| test("INSERT OVERWRITE - partition IF NOT EXISTS") { | ||
| withTempDir { tmpDir => | ||
| val table = "table_with_partition" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use
Option? It looks to me that we will only create one temp dir during one insertion.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure.