Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
Expand Down Expand Up @@ -86,14 +85,15 @@ case class InsertIntoHiveTable(

val hadoopConf = sessionState.newHadoopConf()
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value of the hive.exec.scratchdir also depends on the version.

Default Value: /tmp/${user.name} in Hive 0.2.0 through 0.8.0; /tmp/hive-${user.name} in Hive 0.8.1 through 0.14.0; or /tmp/hive in Hive 0.14.0 and later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea that's true, but I don't think the default value here is a big deal and need to worry about.


private def executionId: String = {
val rand: Random = new Random
val format = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS", Locale.US)
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}

private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
private def getStagingDir(inputPath: Path): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
Expand Down Expand Up @@ -121,21 +121,61 @@ case class InsertIntoHiveTable(
return dir
}

private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
private def getExternalScratchDir(extURI: URI): Path = {
getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
}

def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
def getExternalTmpPath(path: Path): Path = {
val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version.fullVersion
if (hiveVersion.startsWith("0.12") ||
hiveVersion.startsWith("0.13") ||
hiveVersion.startsWith("0.14") ||
hiveVersion.startsWith("1.0")) {
oldStyleExternalTempPath(path)
} else if (hiveVersion.startsWith("1.1") || hiveVersion.startsWith("1.2")) {
newStyleExternalTempPath(path)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to issue an exception here? It sounds like this is the only place we block it. Do you think users might be able to use the higher version, although it might have a few issues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will fail in other places any way, e.g. IsolatedClientLoader.hiveVersion

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, I see. Thanks!

}
}

// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
def oldStyleExternalTempPath(path: Path): Path = {
val extURI: URI = path.toUri
val scratchPath = new Path(scratchDir, executionId)
var dirPath = new Path(
extURI.getScheme,
extURI.getAuthority,
scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())

try {
val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
dirPath = new Path(fs.makeQualified(dirPath).toString())

if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create the directory, but we never drop it?

I checked all the related Hive versions, they have the same context clear function to remove these temporary directories.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh, we drop it after the normal termination of vm by calling fs.deleteOnExit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping these useless directories and files might not make sense, right? Many many files could accumulate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My above concern is not directly related to this PR. Just submitted a PR to resolve the existing issue. #16134 I think it is a very serious bug.

throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
}
fs.deleteOnExit(dirPath)
} catch {
case e: IOException =>
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)

}
dirPath
}

// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
def newStyleExternalTempPath(path: Path): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
getExtTmpPathRelTo(path.getParent, hadoopConf)
getExtTmpPathRelTo(path.getParent)
} else {
new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
new Path(getExternalScratchDir(extURI), "-ext-10000")
}
}

def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
def getExtTmpPathRelTo(path: Path): Path = {
new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
}

private def saveAsHiveFile(
Expand Down Expand Up @@ -172,7 +212,7 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val tmpLocation = getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat

import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.ExtendedHiveTest
Expand All @@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
* is not fully tested.
*/
@ExtendedHiveTest
class VersionsSuite extends SparkFunSuite with Logging {
class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {

private val clientBuilder = new HiveClientBuilder
import clientBuilder.buildClient
Expand Down Expand Up @@ -525,5 +527,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.reset()
assert(client.listTables("default").isEmpty)
}

///////////////////////////////////////////////////////////////////////////
// End-To-End tests
///////////////////////////////////////////////////////////////////////////

test(s"$version: CREATE TABLE AS SELECT") {
withTable("tbl") {
spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
}
}

// TODO: add more tests.
}
}