diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index df8d03b86c53..aca68375e47c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} +import org.apache.spark.sql.types.StructType /** * An interface for looking up relations by name. Used by an [[Analyzer]]. @@ -40,6 +41,12 @@ trait Catalog { def unregisterAllTables(): Unit + def createDataSourceTable( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String]): Unit + protected def processTableIdentifier(tableIdentifier: Seq[String]): Seq[String] = { if (!caseSensitive) { tableIdentifier.map(_.toLowerCase) @@ -81,6 +88,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { tables.clear() } + override def createDataSourceTable( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String]) = { + sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") + } + override def tableExists(tableIdentifier: Seq[String]): Boolean = { val tableIdent = processTableIdentifier(tableIdentifier) tables.get(getDbTableName(tableIdent)) match { @@ -180,5 +195,13 @@ object EmptyCatalog extends Catalog { throw new UnsupportedOperationException } + def createDataSourceTable( + tableName: String, + userSpecifiedSchema: Option[StructType], + provider: String, + options: Map[String, String]): Unit = { + throw new UnsupportedOperationException + } + override def unregisterAllTables(): Unit = {} } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d9f3b3a53f58..2f2c5b8479ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -355,7 +355,6 @@ class SQLContext(@transient val sparkContext: SparkContext) def strategies: Seq[Strategy] = extraStrategies ++ ( DataSourceStrategy :: - DDLStrategy :: TakeOrdered :: HashAggregation :: LeftSemiJoin :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0cc9d049c964..c6b655deb3fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -312,16 +312,4 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } - object DDLStrategy extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) => - ExecutedCommand( - CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil - - case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => - sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.") - - case _ => Nil - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 4cc9641c4d9e..f06bfc6a2d2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -106,7 +106,11 @@ private[sql] class DDLParser extends StandardTokenParsers with PackratParsers wi ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ { case temp ~ tableName ~ columns ~ provider ~ opts => val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields))) - CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts) + if (temp.isDefined) { + CreateTempTableUsing(tableName, userSpecifiedSchema, provider, opts) + } else { + CreateTableUsing(tableName, userSpecifiedSchema, provider, opts) + } } ) @@ -223,8 +227,13 @@ private[sql] case class CreateTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, - temporary: Boolean, - options: Map[String, String]) extends Command + options: Map[String, String]) extends RunnableCommand { + + def run(sqlContext: SQLContext) = { + sqlContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options) + Seq.empty + } +} private [sql] case class CreateTempTableUsing( tableName: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index bf56e60cf995..3e9ac00ec21c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -351,8 +351,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { override def strategies: Seq[Strategy] = extraStrategies ++ Seq( DataSourceStrategy, HiveCommandStrategy(self), - HiveDDLStrategy, - DDLStrategy, TakeOrdered, ParquetOperations, InMemoryScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index d40f9936fd3b..c5321cacd273 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -99,7 +99,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val caseSensitive: Boolean = false - def createDataSourceTable( + override def createDataSourceTable( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 6952b126cf89..ab8a188ffe06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -210,16 +210,6 @@ private[hive] trait HiveStrategies { } } - object HiveDDLStrategy extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) => - ExecutedCommand( - CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil - - case _ => Nil - } - } - case class HiveCommandStrategy(context: HiveContext) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case describe: DescribeCommand =>