Skip to content

Commit 8a4d2b2

Browse files
committed
address comments
1 parent 5725309 commit 8a4d2b2

4 files changed

Lines changed: 14 additions & 8 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,6 @@ trait CheckAnalysis extends PredicateHelper {
375375
s"$numStaticPartitions partition column(s) having constant value(s).")
376376
}
377377

378-
case c if c.getClass.getName ==
379-
"org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan" =>
380-
failAnalysis("Hive support is required to use CREATE Hive TABLE AS SELECT")
381-
382378
case o if !o.resolved =>
383379
failAnalysis(
384380
s"unresolved operator ${operator.simpleString}")

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import java.util.regex.Pattern
2121

2222
import scala.util.control.NonFatal
2323

24+
import org.apache.spark.SparkConf
25+
import org.apache.spark.internal.config._
2426
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
2527
import org.apache.spark.sql.catalyst.TableIdentifier
2628
import org.apache.spark.sql.catalyst.analysis._
@@ -275,7 +277,10 @@ private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[Log
275277
/**
276278
* A rule to do various checks before inserting into or writing to a data source table.
277279
*/
278-
private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
280+
private[sql] case class PreWriteCheck(
281+
sqlConf: SQLConf,
282+
catalog: SessionCatalog,
283+
sparkConf: SparkConf)
279284
extends (LogicalPlan => Unit) {
280285

281286
def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
@@ -285,6 +290,9 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
285290

286291
def apply(plan: LogicalPlan): Unit = {
287292
plan.foreach {
293+
case CreateTable(tableDesc, _, Some(_))
294+
if tableDesc.provider.get == "hive" && sparkConf.get(CATALOG_IMPLEMENTATION) != "hive" =>
295+
failAnalysis("Hive support is required to use CREATE Hive TABLE AS SELECT")
288296
case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
289297
// Since we are saving table metadata to metastore, we should make sure the table name
290298
// and database name don't break some common restrictions, e.g. special chars except
@@ -334,7 +342,7 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
334342
}
335343

336344
PartitioningUtils.validatePartitionColumn(
337-
r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis)
345+
r.schema, part.keySet.toSeq, sqlConf.caseSensitiveAnalysis)
338346

339347
// Get all input data source relations of the query.
340348
val srcRelations = query.collect {

sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {
117117
DataSourceAnalysis(conf) ::
118118
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
119119

120-
override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog))
120+
override val extendedCheckRules =
121+
Seq(PreWriteCheck(conf, catalog, sparkSession.sparkContext.conf))
121122
}
122123
}
123124

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
7070
DataSourceAnalysis(conf) ::
7171
(if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
7272

73-
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
73+
override val extendedCheckRules =
74+
Seq(PreWriteCheck(conf, catalog, sparkSession.sparkContext.conf))
7475
}
7576
}
7677

0 commit comments

Comments
 (0)