Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,23 @@ case class InsertIntoHiveTable(
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
val stagingPathName: String =
var stagingPathName: String =
if (inputPathName.indexOf(stagingDir) == -1) {
new Path(inputPathName, stagingDir).toString
} else {
inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length)
}

// SPARK-20594: The staging directory should be a child directory starts with "." to avoid
// being deleted if we set hive.exec.stagingdir under the table directory.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about?

SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the staging directory needs to avoid being deleted when users set hive.exec.stagingdir under the table directory.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i will update it . Thanks!

if (FileUtils.isSubDir(new Path(stagingPathName), inputPath, fs)
&& !stagingPathName.stripPrefix(inputPathName).startsWith(".")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to hide the issue and make the test cases passed, right?

We need to drop the created staging directory no matter what is the value users set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry i do not follow your logic. Correct me if I'm wrong, but isn't the logic of dropping the created staging directory was already there before with fs.deleteOnExit(dir)?
As @cloud-fan said this patch seems a valid workaround in Spark SQL for this case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fs.deleteOnExit(dir) deletes dir, but the parent directory is still there.

logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " +
s"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " +
s"directory.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: please remove the above TWO string Interpolation s

stagingPathName = new Path(inputPathName, ".hive-staging").toString
}

val dir: Path =
fs.makeQualified(
new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,4 +494,45 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
spark.table("t").write.insertInto(tableName)
}
}

private def dropTables(tableNames: String*): Unit = {
tableNames.foreach { name =>
sql(s"DROP TABLE IF EXISTS $name")
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not needed. You can call withTable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


test(
"""SPARK-20594: The staging directory should be appended with ".hive-staging"
|to avoid being deleted if we set hive.exec.stagingdir under the table directory
|without start with "."""".stripMargin) {

dropTables("test_table", "test_table1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 withTable("test_table", "test_table1") {
    ...
  }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are right. :)


sql("CREATE TABLE test_table (key int, value string)")

// Add some data.
testData.write.mode(SaveMode.Append).insertInto("test_table")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify the above two lines by spark.range(1).write.saveAsTable("test_table")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, as i tested we must create table rather than simplify the above two lines by spark.range(1).write.saveAsTable("test_table").


// Make sure the table has also been updated.
checkAnswer(
sql("SELECT * FROM test_table"),
testData.collect().toSeq
)

sql("CREATE TABLE test_table1 (key int, value string)")

// Set hive.exec.stagingdir under the table directory without start with ".".
sql("set hive.exec.stagingdir=./test")

// Now overwrite.
sql("INSERT OVERWRITE TABLE test_table1 SELECT * FROM test_table")

// Make sure the table has also been updated.
checkAnswer(
sql("SELECT * FROM test_table1"),
testData.collect().toSeq
)

dropTables("test_table", "test_table1")
}
}