Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -370,22 +370,6 @@ trait CheckAnalysis extends PredicateHelper {
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
""".stripMargin)

case s: SimpleCatalogRelation =>
failAnalysis(
s"""
|Hive support is required to select over the following tables:
|${s.catalogTable.identifier}
""".stripMargin)

// TODO: We need to consolidate this kind of checks for InsertIntoTable
// with the rule of PreWriteCheck defined in extendedCheckRules.
case InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) =>
failAnalysis(
s"""
|Hive support is required to insert into the following tables:
|${s.catalogTable.identifier}
""".stripMargin)

case InsertIntoTable(t, _, _, _, _)
if !t.isInstanceOf[LeafNode] ||
t.isInstanceOf[Range] ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ class InMemoryCatalog(
val db = tableDefinition.identifier.database.get
requireDbExists(db)
val table = tableDefinition.identifier.table
if (tableDefinition.provider.isDefined && tableDefinition.provider.get.toLowerCase == "hive") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put this in HiveOnlyCheck?

throw new AnalysisException(
s"Hive support is required for creating a Hive data source table `$table`; or create " +
"a file-based data source table instead")
}
if (tableExists(db, table)) {
if (!ignoreIfExists) {
throw new TableAlreadyExistsException(db = db, table = table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac

test("the table type of an external table should be EXTERNAL_TABLE") {
val catalog = newBasicCatalog()
val table =
newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL)
val table = newTable("external_table1", "db2").copy(tableType = CatalogTableType.EXTERNAL)
catalog.createTable(table, ignoreIfExists = false)
val actual = catalog.getTable("db2", "external_table1")
assert(actual.tableType === CatalogTableType.EXTERNAL)
Expand Down Expand Up @@ -278,7 +277,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
schema = new StructType()
.add("HelLo", "int", nullable = false)
.add("WoRLd", "int", nullable = true),
provider = Some("hive"),
provider = Some("parquet"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we also use defaultProvider here?

partitionColumnNames = Seq("WoRLd"),
bucketSpec = Some(BucketSpec(4, Seq("HelLo"), Nil)))
catalog.createTable(tbl, ignoreIfExists = false)
Expand Down Expand Up @@ -330,7 +329,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
.add("col2", "string")
.add("partCol1", "int")
.add("partCol2", "string"),
provider = Some("hive"),
provider = Some("parquet"),
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)

Expand All @@ -357,7 +356,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
.add("col2", "string")
.add("partCol1", "int")
.add("partCol2", "string"),
provider = Some("hive"),
provider = Some("parquet"),
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)

Expand Down Expand Up @@ -505,7 +504,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
.add("col2", "string")
.add("partCol1", "int")
.add("partCol2", "string"),
provider = Some("hive"),
provider = Some("parquet"),
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)

Expand Down Expand Up @@ -726,7 +725,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some("hive")
provider = Some("parquet")
)

catalog.createTable(table, ignoreIfExists = false)
Expand All @@ -746,7 +745,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
Some(Utils.createTempDir().getAbsolutePath),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some("hive")
provider = Some("parquet")
)
catalog.createTable(externalTable, ignoreIfExists = false)
assert(!exists(db.locationUri, "external_table"))
Expand All @@ -763,7 +762,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
.add("col2", "string")
.add("partCol1", "int")
.add("partCol2", "string"),
provider = Some("hive"),
provider = Some("parquet"),
partitionColumnNames = Seq("partCol1", "partCol2"))
catalog.createTable(table, ignoreIfExists = false)

Expand Down Expand Up @@ -829,6 +828,7 @@ abstract class CatalogTestUtils {
// Unimplemented methods
val tableInputFormat: String
val tableOutputFormat: String
val defaultProvider: String
def newEmptyCatalog(): ExternalCatalog

// These fields must be lazy because they rely on fields that are not implemented yet
Expand Down Expand Up @@ -899,7 +899,7 @@ abstract class CatalogTestUtils {
.add("col2", "string")
.add("a", "int")
.add("b", "string"),
provider = Some("hive"),
provider = Some(defaultProvider),
partitionColumnNames = Seq("a", "b"),
bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class InMemoryCatalogSuite extends ExternalCatalogSuite {
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat"
override val tableOutputFormat: String = "org.apache.park.SequenceFileOutputFormat"
override val defaultProvider: String = "parquet"
override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class SessionCatalogSuite extends PlanTest {
private val utils = new CatalogTestUtils {
override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
override val defaultProvider: String = "parquet"
override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
identifier = table.identifier.copy(
database = Some(
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions)
val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
Expand Down Expand Up @@ -98,7 +98,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// partition provider hive, but no partitions in the metastore. The user has to call
// `msck repair table` to populate the table partitions.
tracksPartitionsInCatalog = partitionColumnNames.nonEmpty &&
sparkSession.sessionState.conf.manageFilesourcePartitions)
sessionState.conf.manageFilesourcePartitions)
// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
sessionState.catalog.createTable(newTable, ignoreIfExists = false)
Expand Down Expand Up @@ -172,8 +172,7 @@ case class CreateDataSourceTableAsSelectCommand(
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
// Need to recover partitions into the metastore so our saved data is visible.
sparkSession.sessionState.executePlan(
AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
provider = Some("hive"),
provider = Some("parquet"),
partitionColumnNames = Seq("a", "b"),
createTime = 0L,
tracksPartitionsInCatalog = true)
Expand Down Expand Up @@ -758,15 +758,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
testUnsetProperties(isDatasourceTable = true)
}

test("alter table: set serde") {
// TODO: move this test to HiveDDLSuite.scala
ignore("alter table: set serde") {
testSetSerde(isDatasourceTable = false)
}

test("alter table: set serde (datasource table)") {
testSetSerde(isDatasourceTable = true)
}

test("alter table: set serde partition") {
// TODO: move this test to HiveDDLSuite.scala
ignore("alter table: set serde partition") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two test cases should be moved to HiveDDLSuite. However, it is ugly to copy the codes. Thus, my next plan is to improve the DDLSuite and HiveDDLSuite by creating an abstract class.

testSetSerdePartition(isDatasourceTable = false)
}

Expand Down Expand Up @@ -1479,49 +1481,33 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
)
}

test("select/insert into the managed table") {
test("create a managed Hive source table") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
val tabName = "tbl"
withTable(tabName) {
sql(s"CREATE TABLE $tabName (i INT, j STRING)")
val catalogTable =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
assert(catalogTable.tableType == CatalogTableType.MANAGED)

var message = intercept[AnalysisException] {
sql(s"INSERT OVERWRITE TABLE $tabName SELECT 1, 'a'")
}.getMessage
assert(message.contains("Hive support is required to insert into the following tables"))
message = intercept[AnalysisException] {
sql(s"SELECT * FROM $tabName")
val e = intercept[AnalysisException] {
sql(s"CREATE TABLE $tabName (i INT, j STRING)")
}.getMessage
assert(message.contains("Hive support is required to select over the following tables"))
assert(e.contains("Hive support is required for creating a Hive data source table " +
"`tbl`; or create a file-based data source table instead"))
}
}

test("select/insert into external table") {
test("create an external Hive source table") {
assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory")
withTempDir { tempDir =>
val tabName = "tbl"
withTable(tabName) {
sql(
s"""
|CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
|ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|LOCATION '$tempDir'
val e = intercept[AnalysisException] {
sql(
s"""
|CREATE EXTERNAL TABLE $tabName (i INT, j STRING)
|ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
|LOCATION '$tempDir'
""".stripMargin)
val catalogTable =
spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName, Some("default")))
assert(catalogTable.tableType == CatalogTableType.EXTERNAL)

var message = intercept[AnalysisException] {
sql(s"INSERT OVERWRITE TABLE $tabName SELECT 1, 'a'")
}.getMessage
assert(message.contains("Hive support is required to insert into the following tables"))
message = intercept[AnalysisException] {
sql(s"SELECT * FROM $tabName")
}.getMessage
assert(message.contains("Hive support is required to select over the following tables"))
assert(e.contains("Hive support is required for creating a Hive data source table " +
"`tbl`; or create a file-based data source table instead"))
}
}
}
Expand Down Expand Up @@ -1692,20 +1678,27 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {

test("truncate table - external table, temporary table, view (not allowed)") {
import testImplicits._
val path = Utils.createTempDir().getAbsolutePath
(1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab")
sql(s"CREATE EXTERNAL TABLE my_ext_tab LOCATION '$path'")
sql(s"CREATE VIEW my_view AS SELECT 1")
intercept[NoSuchTableException] {
sql("TRUNCATE TABLE my_temp_tab")
withTempPath { tempDir =>
withTable("my_ext_tab") {
(("a", "b") :: Nil).toDF().write.parquet(tempDir.getCanonicalPath)
(1 to 10).map { i => (i, i) }.toDF("a", "b").createTempView("my_temp_tab")
sql(s"CREATE TABLE my_ext_tab using parquet LOCATION '$tempDir'")
Copy link
Member

@HyukjinKwon HyukjinKwon Jan 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is why you asked me #16586 (comment). I just ran a test on Windows to help.

 - truncate table - external table, temporary table, view (not allowed) *** FAILED *** (188 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark	arget	mpspark-9e70280d-56dc-4063-8f40-8e62fec18394;
   at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:382)
   at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)

Maybe, it'd be okay to just use toURI if this test is not supposed to test path variants.

sql(s"CREATE VIEW my_view AS SELECT 1")
intercept[NoSuchTableException] {
sql("TRUNCATE TABLE my_temp_tab")
}
assertUnsupported("TRUNCATE TABLE my_ext_tab")
assertUnsupported("TRUNCATE TABLE my_view")
}
}
assertUnsupported("TRUNCATE TABLE my_ext_tab")
assertUnsupported("TRUNCATE TABLE my_view")
}

test("truncate table - non-partitioned table (not allowed)") {
sql("CREATE TABLE my_tab (age INT, name STRING)")
assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
withTable("my_tab") {
sql("CREATE TABLE my_tab (age INT, name STRING) using parquet")
sql("INSERT INTO my_tab values (10, 'a')")
assertUnsupported("TRUNCATE TABLE my_tab PARTITION (age=10)")
}
}

test("SPARK-16034 Partition columns should match when appending to existing data source tables") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class CatalogSuite
private val utils = new CatalogTestUtils {
override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat"
override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat"
override val defaultProvider: String = "parquet"
override def newEmptyCatalog(): ExternalCatalog = spark.sharedState.externalCatalog
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat"
override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat"
override def newEmptyCatalog(): ExternalCatalog = externalCatalog
override val defaultProvider: String = "hive"
}

protected override def resetState(): Unit = {
Expand Down