Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ abstract class ExternalCatalog {
table: String,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does isSrcLocal mean? Can you document it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means the source data comes from a "LOAD DATA LOCAL" query.

I can add a partial scaladoc to these methods, but I don't really know the meaning of some of the other arguments, so I can't write a complete one.


def loadPartition(
db: String,
Expand All @@ -128,7 +129,8 @@ abstract class ExternalCatalog {
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit

def loadDynamicPartitions(
db: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ class InMemoryCatalog(
table: String,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit = {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = {
throw new UnsupportedOperationException("loadTable is not implemented")
}

Expand All @@ -323,7 +324,8 @@ class InMemoryCatalog(
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit = {
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = {
throw new UnsupportedOperationException("loadPartition is not implemented.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,12 +311,13 @@ class SessionCatalog(
name: TableIdentifier,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit = {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime)
externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime, isSrcLocal)
}

/**
Expand All @@ -330,13 +331,14 @@ class SessionCatalog(
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit = {
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = {
val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
val table = formatTableName(name.table)
requireDbExists(db)
requireTableExists(TableIdentifier(table, Some(db)))
externalCatalog.loadPartition(
db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs)
db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSrcLocal)
}

def defaultTablePath(tableIdent: TableIdentifier): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,13 +297,15 @@ case class LoadDataCommand(
partition.get,
isOverwrite,
holdDDLTime = false,
inheritTableSpecs = true)
inheritTableSpecs = true,
isSrcLocal = isLocal)
} else {
catalog.loadTable(
targetTable.identifier,
loadPath.toString,
isOverwrite,
holdDDLTime = false)
holdDDLTime = false,
isSrcLocal = isLocal)
}
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,13 +736,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
table: String,
loadPath: String,
isOverwrite: Boolean,
holdDDLTime: Boolean): Unit = withClient {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = withClient {
requireTableExists(db, table)
client.loadTable(
loadPath,
s"$db.$table",
isOverwrite,
holdDDLTime)
holdDDLTime,
isSrcLocal)
}

override def loadPartition(
Expand All @@ -752,7 +754,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
partition: TablePartitionSpec,
isOverwrite: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit = withClient {
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = withClient {
requireTableExists(db, table)

val orderedPartitionSpec = new util.LinkedHashMap[String, String]()
Expand All @@ -771,7 +774,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
orderedPartitionSpec,
isOverwrite,
holdDDLTime,
inheritTableSpecs)
inheritTableSpecs,
isSrcLocal)
}

override def loadDynamicPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,16 @@ private[hive] trait HiveClient {
partSpec: java.util.LinkedHashMap[String, String], // Hive relies on LinkedHashMap ordering
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit

/** Loads data into an existing table. */
def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit

/** Loads new dynamic partitions into an existing table. */
def loadDynamicPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,8 @@ private[hive] class HiveClientImpl(
partSpec: java.util.LinkedHashMap[String, String],
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean): Unit = withHiveState {
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = withHiveState {
val hiveTable = client.getTable(dbName, tableName, true /* throw exception */)
shim.loadPartition(
client,
Expand All @@ -661,20 +662,23 @@ private[hive] class HiveClientImpl(
replace,
holdDDLTime,
inheritTableSpecs,
isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories)
isSkewedStoreAsSubdir = hiveTable.isStoredAsSubDirectories,
isSrcLocal = isSrcLocal)
}

def loadTable(
loadPath: String, // TODO URI
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit = withHiveState {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = withHiveState {
shim.loadTable(
client,
new Path(loadPath),
tableName,
replace,
holdDDLTime)
holdDDLTime,
isSrcLocal)
}

def loadDynamicPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ private[client] sealed abstract class Shim {
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit

def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit

def loadDynamicPartitions(
hive: Hive,
Expand Down Expand Up @@ -332,7 +334,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit = {
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean)
}
Expand All @@ -342,7 +345,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
loadPath: Path,
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit = {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean)
}

Expand Down Expand Up @@ -698,20 +702,22 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
replace: Boolean,
holdDDLTime: Boolean,
inheritTableSpecs: Boolean,
isSkewedStoreAsSubdir: Boolean): Unit = {
isSkewedStoreAsSubdir: Boolean,
isSrcLocal: Boolean): Unit = {
loadPartitionMethod.invoke(hive, loadPath, tableName, partSpec, replace: JBoolean,
holdDDLTime: JBoolean, inheritTableSpecs: JBoolean, isSkewedStoreAsSubdir: JBoolean,
isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE)
isSrcLocal: JBoolean, JBoolean.FALSE)
}

override def loadTable(
hive: Hive,
loadPath: Path,
tableName: String,
replace: Boolean,
holdDDLTime: Boolean): Unit = {
holdDDLTime: Boolean,
isSrcLocal: Boolean): Unit = {
loadTableMethod.invoke(hive, loadPath, tableName, replace: JBoolean, holdDDLTime: JBoolean,
isSrcLocal(loadPath, hive.getConf()): JBoolean, JBoolean.FALSE, JBoolean.FALSE)
isSrcLocal: JBoolean, JBoolean.FALSE, JBoolean.FALSE)
}

override def loadDynamicPartitions(
Expand Down Expand Up @@ -749,12 +755,6 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
TimeUnit.MILLISECONDS).asInstanceOf[Long]
}

protected def isSrcLocal(path: Path, conf: HiveConf): Boolean = {
val localFs = FileSystem.getLocal(conf)
val pathFs = FileSystem.get(path.toUri(), conf)
localFs.getUri() == pathFs.getUri()
}

}

private[client] class Shim_v1_0 extends Shim_v0_14 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ case class InsertIntoHiveTable(
partitionSpec,
isOverwrite = doHiveOverwrite,
holdDDLTime = holdDDLTime,
inheritTableSpecs = inheritTableSpecs)
inheritTableSpecs = inheritTableSpecs,
isSrcLocal = false)
}
}
} else {
Expand All @@ -325,7 +326,8 @@ case class InsertIntoHiveTable(
table.catalogTable.identifier.table,
outputPath.toString, // TODO: URI
overwrite,
holdDDLTime)
holdDDLTime,
isSrcLocal = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, how can we know this is always not a local file system (e.g., as you said above, if your warehouse directory is in the local file system too)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to. "isSrcLocal" comes from the user query.

"LOAD DATA LOCAL" -> "isSrcLocal" = true
anything else -> "isSrcLocal" = false

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the reason why we can set it to false. The files are created by us. We can set it to false and let Hive move it instead of copying it.

}

// Invalidate the cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
emptyDir,
tableName = "src",
replace = false,
holdDDLTime = false)
holdDDLTime = false,
isSrcLocal = false)
}

test(s"$version: tableExists") {
Expand Down Expand Up @@ -310,7 +311,8 @@ class VersionsSuite extends SparkFunSuite with Logging {
partSpec,
replace = false,
holdDDLTime = false,
inheritTableSpecs = false)
inheritTableSpecs = false,
isSrcLocal = false)
}

test(s"$version: loadDynamicPartitions") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.spark.sql.hive.execution

import java.io.File

import com.google.common.io.Files

import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
Expand Down Expand Up @@ -232,31 +236,40 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
sql("""LOAD DATA LOCAL INPATH "/non-existing/data.txt" INTO TABLE non_part_table""")
}

val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
val testData = hiveContext.getHiveFile("data/files/employee.dat")

// Non-local inpath: without URI Scheme and Authority
sql(s"""LOAD DATA INPATH "$testData" INTO TABLE non_part_table""")
withCopy(testData) { tmp =>
sql(s"""LOAD DATA INPATH "${tmp.getCanonicalPath()}" INTO TABLE non_part_table""")
}

checkAnswer(
sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
Row(16, "john") :: Nil)

// Use URI as LOCAL inpath:
// file:/path/to/data/files/employee.dat
val uri = "file:" + testData
val uri = "file:" + testData.getCanonicalPath()
sql(s"""LOAD DATA LOCAL INPATH "$uri" INTO TABLE non_part_table""")

checkAnswer(
sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
Row(16, "john") :: Row(16, "john") :: Nil)

// Use URI as non-LOCAL inpath
sql(s"""LOAD DATA INPATH "$uri" INTO TABLE non_part_table""")
withCopy(testData) { tmp =>
val tmpUri = "file:" + tmp.getCanonicalPath()
sql(s"""LOAD DATA INPATH "$tmpUri" INTO TABLE non_part_table""")
}

checkAnswer(
sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
Row(16, "john") :: Row(16, "john") :: Row(16, "john") :: Nil)

sql(s"""LOAD DATA INPATH "$uri" OVERWRITE INTO TABLE non_part_table""")
withCopy(testData) { tmp =>
val tmpUri = "file:" + tmp.getCanonicalPath()
sql(s"""LOAD DATA INPATH "$tmpUri" OVERWRITE INTO TABLE non_part_table""")
}

checkAnswer(
sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
Expand Down Expand Up @@ -418,4 +431,19 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
assert(sql("SHOW PARTITIONS part_datasrc").count() == 3)
}
}

/**
* Run a function with a copy of the input file. Use this for tests that use "LOAD DATA"
* (instead of "LOAD DATA LOCAL") since, according to Hive's semantics, files are moved
Copy link
Contributor

@cloud-fan cloud-fan Dec 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantic change happened in Hive 2.1, looks we don't need to update the tests for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the tests need to be updated because now loadTable is being called with "isSrcLocal = false". That makes the source file be moved instead of copied, and that makes subsequent unit tests fail. (That's the cause of the initial test failures in this PR.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then can we test LOAD DATA and LOAD DATA LOCAL separately? We can add comments to explain the semantic difference between them and why we need to copy the file

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can move each of them into separate tests.

* into the target location in that case, and we need the original file to be preserved.
*/
private def withCopy(source: File)(fn: File => Unit): Unit = {
val tmp = File.createTempFile(source.getName(), ".tmp")
Files.copy(source, tmp)
try {
fn(tmp)
} finally {
tmp.delete()
}
}
}