Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def _createForTesting(cls, sparkContext):
confusing error messages.
"""
jsc = sparkContext._jsc.sc()
jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc)
jtestHive = sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False)
return cls(sparkContext, jtestHive)

def refreshTable(self, tableName):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ object TestHive
.set("spark.sql.hive.metastore.barrierPrefixes",
"org.apache.spark.sql.hive.execution.PairSerDe")
// SPARK-8910
.set("spark.ui.enabled", "false")))
.set("spark.ui.enabled", "false")),
createTestTables = true)


/**
Expand All @@ -70,15 +71,21 @@ object TestHive
* test cases that rely on TestHive must be serialized.
*/
class TestHiveContext(
@transient override val sparkSession: TestHiveSparkSession)
@transient override val sparkSession: TestHiveSparkSession,
private val createTestTables: Boolean)
extends SQLContext(sparkSession) {

def this(sc: SparkContext, createTestTables: Boolean) {
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), createTestTables),
createTestTables)
}

def this(sc: SparkContext) {
this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)))
this(sc, createTestTables = true)
}

override def newSession(): TestHiveContext = {
new TestHiveContext(sparkSession.newSession())
new TestHiveContext(sparkSession.newSession(), createTestTables)
}

override def sharedState: TestHiveSharedState = sparkSession.sharedState
Expand Down Expand Up @@ -109,7 +116,8 @@ private[hive] class TestHiveSparkSession(
val warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String],
@transient private val existingSharedState: Option[TestHiveSharedState])
@transient private val existingSharedState: Option[TestHiveSharedState],
private val createTestTables: Boolean)
extends SparkSession(sc) with Logging { self =>

// TODO: We need to set the temp warehouse path to sc's conf.
Expand All @@ -118,13 +126,14 @@ private[hive] class TestHiveSparkSession(
// when we creating metadataHive. This flow is not easy to follow and can introduce
// confusion when a developer is debugging an issue. We need to refactor this part
// to just set the temp warehouse path in sc's conf.
def this(sc: SparkContext) {
def this(sc: SparkContext, createTestTables: Boolean) {
this(
sc,
Utils.createTempDir(namePrefix = "warehouse"),
TestHiveContext.makeScratchDir(),
HiveUtils.newTemporaryConfiguration(useInMemoryDerby = false),
None)
None,
createTestTables)
}

assume(sc.conf.get(CATALOG_IMPLEMENTATION) == "hive")
Expand All @@ -144,7 +153,8 @@ private[hive] class TestHiveSparkSession(

override def newSession(): TestHiveSparkSession = {
new TestHiveSparkSession(
sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState))
sc, warehousePath, scratchDirPath, metastoreTemporaryConf, Some(sharedState),
createTestTables)
}

private var cacheTables: Boolean = false
Expand Down Expand Up @@ -179,8 +189,25 @@ private[hive] class TestHiveSparkSession(
hiveFilesTemp.mkdir()
ShutdownHookManager.registerShutdownDeleteDir(hiveFilesTemp)

lazy val inRepoTests = {
if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
} else {
new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
File.separator + "resources")
}
}

def getHiveFile(path: String): File = {
new File(Thread.currentThread().getContextClassLoader.getResource(path).getFile)
// Attempt to load from class loader, fall back to old system property based for Python tests.
val resourcePath = Option(Thread.currentThread().getContextClassLoader.getResource(path))
resourcePath.map(rp => new File(rp.getFile)).getOrElse{
val stripped = path.replaceAll("""\.\.\/""", "").replace('/', File.separatorChar)
hiveDevHome
.map(new File(_, stripped))
.filter(_.exists)
.getOrElse(new File(inRepoTests, stripped))
}
}

val describedTable = "DESCRIBE (\\w+)".r
Expand Down Expand Up @@ -208,7 +235,7 @@ private[hive] class TestHiveSparkSession(
// /itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java
// https://github.com/apache/hive/blob/branch-0.13/data/scripts/q_test_init.sql
@transient
val hiveQTestUtilTables = Seq(
lazy val hiveQTestUtilTables = Seq(
TestTable("src",
"CREATE TABLE src (key INT, value STRING)".cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv1.txt")}' INTO TABLE src".cmd),
Expand Down Expand Up @@ -362,7 +389,9 @@ private[hive] class TestHiveSparkSession(
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/json.txt")}' INTO TABLE src_json".cmd)
)

hiveQTestUtilTables.foreach(registerTestTable)
if (createTestTables) {
hiveQTestUtilTables.foreach(registerTestTable)
}

private val loadedTables = new collection.mutable.HashSet[String]

Expand Down