Skip to content

Commit ca83f1e

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-2917] [SQL] Avoid table creation in logical plan analyzing for CTAS
Author: Cheng Hao <hao.cheng@intel.com> Closes #1846 from chenghao-intel/ctas and squashes the following commits: 56a0578 [Cheng Hao] remove the unused imports 9a57abc [Cheng Hao] Avoid table creation in logical plan analyzing
1 parent 1ef656e commit ca83f1e

8 files changed

Lines changed: 104 additions & 17 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,12 @@ case class InsertIntoTable(
114114
}
115115
}
116116

117-
case class InsertIntoCreatedTable(
117+
case class CreateTableAsSelect(
118118
databaseName: Option[String],
119119
tableName: String,
120120
child: LogicalPlan) extends UnaryNode {
121121
override def output = child.output
122+
override lazy val resolved = (databaseName != None && childrenResolved)
122123
}
123124

124125
case class WriteToFile(

sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
5454
@transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
5555
// For various commands (like DDL) and queries with side effects, we force query optimization to
5656
// happen right away to let these side effects take place eagerly.
57-
case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
57+
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
5858
queryExecution.toRdd
5959
SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
6060
case _ =>
@@ -124,7 +124,7 @@ private[sql] trait SchemaRDDLike {
124124
*/
125125
@Experimental
126126
def saveAsTable(tableName: String): Unit =
127-
sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
127+
sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd
128128

129129
/** Returns the schema as a string in the tree format.
130130
*

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
5454
db: Option[String],
5555
tableName: String,
5656
alias: Option[String]): LogicalPlan = synchronized {
57-
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
58-
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
57+
val (databaseName, tblName) = processDatabaseAndTableName(
58+
db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
5959
val table = client.getTable(databaseName, tblName)
6060
val partitions: Seq[Partition] =
6161
if (table.isPartitioned) {
@@ -112,17 +112,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
112112
// Wait until children are resolved.
113113
case p: LogicalPlan if !p.childrenResolved => p
114114

115-
case InsertIntoCreatedTable(db, tableName, child) =>
115+
case CreateTableAsSelect(db, tableName, child) =>
116116
val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
117117
val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
118118

119-
createTable(databaseName, tblName, child.output)
120-
121-
InsertIntoTable(
122-
lookupRelation(Some(databaseName), tblName, None),
123-
Map.empty,
124-
child,
125-
overwrite = false)
119+
CreateTableAsSelect(Some(databaseName), tableName, child)
126120
}
127121
}
128122

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ private[hive] object HiveQl {
489489

490490
val (db, tableName) = extractDbNameTableName(tableNameParts)
491491

492-
InsertIntoCreatedTable(db, tableName, nodeToPlan(query))
492+
CreateTableAsSelect(db, tableName, nodeToPlan(query))
493493

494494
// If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
495495
case Token("TOK_CREATETABLE", _) => NativePlaceholder

sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,16 @@ private[hive] trait HiveStrategies {
165165
InMemoryRelation(_, _, _,
166166
HiveTableScan(_, table, _)), partition, child, overwrite) =>
167167
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
168+
case logical.CreateTableAsSelect(database, tableName, child) =>
169+
val query = planLater(child)
170+
CreateTableAsSelect(
171+
database.get,
172+
tableName,
173+
query,
174+
InsertIntoHiveTable(_: MetastoreRelation,
175+
Map(),
176+
query,
177+
true)(hiveContext)) :: Nil
168178
case _ => Nil
169179
}
170180
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.execution
19+
20+
import org.apache.spark.annotation.Experimental
21+
import org.apache.spark.rdd.RDD
22+
import org.apache.spark.sql.catalyst.expressions.Row
23+
import org.apache.spark.sql.catalyst.expressions.Attribute
24+
import org.apache.spark.sql.catalyst.plans.logical.LowerCaseSchema
25+
import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
26+
import org.apache.spark.sql.hive.HiveContext
27+
import org.apache.spark.sql.hive.MetastoreRelation
28+
29+
/**
30+
* :: Experimental ::
31+
* Create table and insert the query result into it.
32+
* @param database the database name of the new relation
33+
* @param tableName the table name of the new relation
34+
* @param insertIntoRelation function of creating the `InsertIntoHiveTable`
35+
* by specifying the `MetaStoreRelation`, the data will be inserted into that table.
36+
* TODO Add more table creating properties, e.g. SerDe, StorageHandler, in-memory cache etc.
37+
*/
38+
@Experimental
39+
case class CreateTableAsSelect(
40+
database: String,
41+
tableName: String,
42+
query: SparkPlan,
43+
insertIntoRelation: MetastoreRelation => InsertIntoHiveTable)
44+
extends LeafNode with Command {
45+
46+
def output = Seq.empty
47+
48+
// A lazy computing of the metastoreRelation
49+
private[this] lazy val metastoreRelation: MetastoreRelation = {
50+
// Create the table
51+
val sc = sqlContext.asInstanceOf[HiveContext]
52+
sc.catalog.createTable(database, tableName, query.output, false)
53+
// Get the Metastore Relation
54+
sc.catalog.lookupRelation(Some(database), tableName, None) match {
55+
case LowerCaseSchema(r: MetastoreRelation) => r
56+
case o: MetastoreRelation => o
57+
}
58+
}
59+
60+
override protected[sql] lazy val sideEffectResult: Seq[Row] = {
61+
insertIntoRelation(metastoreRelation).execute
62+
Seq.empty[Row]
63+
}
64+
65+
override def execute(): RDD[Row] = {
66+
sideEffectResult
67+
sparkContext.emptyRDD[Row]
68+
}
69+
70+
override def argString: String = {
71+
s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString
72+
}
73+
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ case class InsertIntoHiveTable(
5353
(@transient sc: HiveContext)
5454
extends UnaryNode {
5555

56-
val outputClass = newSerializer(table.tableDesc).getSerializedClass
57-
@transient private val hiveContext = new Context(sc.hiveconf)
58-
@transient private val db = Hive.get(sc.hiveconf)
56+
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
57+
@transient private lazy val hiveContext = new Context(sc.hiveconf)
58+
@transient private lazy val db = Hive.get(sc.hiveconf)
5959

6060
private def newSerializer(tableDesc: TableDesc): Serializer = {
6161
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.spark.sql.hive.execution
1919

2020
import org.apache.spark.sql.QueryTest
21+
22+
import org.apache.spark.sql.Row
2123
import org.apache.spark.sql.hive.test.TestHive._
2224

2325
case class Nested1(f1: Nested2)
@@ -54,4 +56,11 @@ class SQLQuerySuite extends QueryTest {
5456
sql("SELECT f1.f2.f3 FROM nested"),
5557
1)
5658
}
59+
60+
test("test CTAS") {
61+
checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row])
62+
checkAnswer(
63+
sql("SELECT key, value FROM test_ctas_123 ORDER BY key"),
64+
sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
65+
}
5766
}

0 commit comments

Comments
 (0)