Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1952,4 +1952,54 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}
}
}

test("CTAS for managed data source table with a created default location throw an exception") {
withTable("t", "t1", "t2") {
val warehousePath = spark.sharedState.warehousePath.stripPrefix("file:")
val tFile = new File(warehousePath, "t")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are lots of non-existent default location test cases, so here we just add existed default location test cases

tFile.mkdirs()
assert(tFile.exists)

val e = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t
|USING parquet
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists."))

// partition table(table path exists)
val tFile1 = new File(warehousePath, "t1")
tFile1.mkdirs()
assert(tFile1.exists)
val e1 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t1
|USING parquet
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists."))

// partition table(partition path exists)
val tFile2 = new File(warehousePath, "t2")
val tPartFile = new File(tFile2, "a=3/b=4")
tPartFile.mkdirs()
assert(tPartFile.exists)
val e2 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t2
|USING parquet
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists."))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package org.apache.spark.sql.hive.execution

import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand

Expand Down Expand Up @@ -68,9 +70,22 @@ case class CreateHiveTableAsSelectCommand(
// add the relation into catalog, just in case of failure occurs while data
// processing.
assert(tableDesc.schema.isEmpty)
sparkSession.sessionState.catalog.createTable(
tableDesc.copy(schema = query.schema), ignoreIfExists = false)

// As discussed in SPARK-19583, in CTAS the default location of a managed
// table should not exists
if (mode == SaveMode.ErrorIfExists && tableDesc.tableType == CatalogTableType.MANAGED) {
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val tblLocationPath =
new Path(sparkSession.sessionState.catalog.defaultTablePath(tableIdentifier))
val fs = tblLocationPath.getFileSystem(hadoopConf)
if (fs.exists(tblLocationPath)) {
throw new AnalysisException(s"the location('$tblLocationPath') of table" +
s"('$tableIdentifier') already exists.")
}
}

sparkSession.sessionState.catalog.createTable(
tableDesc.copy(schema = query.schema), ignoreIfExists = false)
try {
sparkSession.sessionState.executePlan(
InsertIntoTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1587,4 +1587,107 @@ class HiveDDLSuite
}
}
}

test("CTAS for managed data source table with a created default location throw an exception") {
withTable("t", "t1", "t2") {
val warehousePath = spark.sharedState.warehousePath
val tFile = new File(warehousePath, "t")
tFile.mkdirs()
assert(tFile.exists)

val e = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t
|USING parquet
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e.contains(s"path file:${tFile.getAbsolutePath} already exists."))

// partition table(table path exists)
val tFile1 = new File(warehousePath, "t1")
tFile1.mkdirs()
assert(tFile1.exists)
val e1 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t1
|USING parquet
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e1.contains(s"path file:${tFile1.getAbsolutePath} already exists."))

// partition table(partition path exists)
val tFile2 = new File(warehousePath, "t2")
val tPartFile = new File(tFile2, "a=3/b=4")
tPartFile.mkdirs()
assert(tPartFile.exists)
val e2 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t2
|USING parquet
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e2.contains(s"path file:${tFile2.getAbsolutePath} already exists."))
}
}

test("CTAS for managed hive table with a created default location throw an exception") {
withTable("t", "t1", "t2") {
val warehousePath = spark.sharedState.warehousePath
val tFile = new File(warehousePath, "t")
tFile.mkdirs()
assert(tFile.exists)

val e = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t
|USING hive
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e.contains(s"the location('file:${tFile.getAbsolutePath}') of table" +
s"('`default`.`t`') already exists."))

// partition table(table path exists)
val tFile1 = new File(warehousePath, "t1")
tFile1.mkdirs()
assert(tFile1.exists)
val e1 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t1
|USING hive
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" +
s"('`default`.`t1`') already exists."))

// partition table(partition path exists)
val tFile2 = new File(warehousePath, "t2")
val tPartFile = new File(tFile2, "a=3/b=4")
tPartFile.mkdirs()
assert(tPartFile.exists)
val e2 = intercept[AnalysisException] {
spark.sql(
s"""
|CREATE TABLE t2
|USING hive
|PARTITIONED BY(a, b)
|AS SELECT 3 as a, 4 as b, 1 as c, 2 as d
""".stripMargin)
}.getMessage
assert(e1.contains(s"the location('file:${tFile1.getAbsolutePath}') of table" +
s"('`default`.`t1`') already exists."))
}
}
}