Skip to content

Commit bf50d73

Browse files
committed
When appending data, we use the schema of the existing table instead of the schema of the new data.
1 parent 0a703e7 commit bf50d73

2 files changed

Lines changed: 19 additions & 21 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.sources
1919
import org.apache.spark.sql.{DataFrame, SQLContext}
2020
import org.apache.spark.sql.catalyst.expressions._
2121
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
22-
import org.apache.spark.sql.execution.{LogicalRDD, RunnableCommand}
22+
import org.apache.spark.sql.execution.RunnableCommand
2323

2424
private[sql] case class InsertIntoDataSource(
2525
logicalRelation: LogicalRelation,
@@ -29,7 +29,10 @@ private[sql] case class InsertIntoDataSource(
2929

3030
override def run(sqlContext: SQLContext) = {
3131
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
32-
relation.insert(DataFrame(sqlContext, query), overwrite)
32+
val data = DataFrame(sqlContext, query)
33+
// Apply the schema of the existing table to the new data.
34+
val df = sqlContext.createDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
35+
relation.insert(df, overwrite)
3336

3437
// Invalidate the cache.
3538
sqlContext.cacheManager.invalidateCache(logicalRelation)

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ case class CreateMetastoreDataSourceAsSelect(
169169
options
170170
}
171171

172+
var existingSchema = None: Option[StructType]
172173
if (sqlContext.catalog.tableExists(Seq(tableName))) {
173174
// Check if we need to throw an exception or just return.
174175
mode match {
@@ -188,22 +189,7 @@ case class CreateMetastoreDataSourceAsSelect(
188189
val createdRelation = LogicalRelation(resolved.relation)
189190
EliminateSubQueries(sqlContext.table(tableName).logicalPlan) match {
190191
case l @ LogicalRelation(i: InsertableRelation) =>
191-
if (l.schema != createdRelation.schema) {
192-
val errorDescription =
193-
s"Cannot append to table $tableName because the schema of this " +
194-
s"DataFrame does not match the schema of table $tableName."
195-
val errorMessage =
196-
s"""
197-
|$errorDescription
198-
|== Schemas ==
199-
|${sideBySide(
200-
s"== Expected Schema ==" +:
201-
l.schema.treeString.split("\\\n"),
202-
s"== Actual Schema ==" +:
203-
createdRelation.schema.treeString.split("\\\n")).mkString("\n")}
204-
""".stripMargin
205-
throw new AnalysisException(errorMessage)
206-
} else if (i != createdRelation.relation) {
192+
if (i != createdRelation.relation) {
207193
val errorDescription =
208194
s"Cannot append to table $tableName because the resolved relation does not " +
209195
s"match the existing relation of $tableName. " +
@@ -221,6 +207,7 @@ case class CreateMetastoreDataSourceAsSelect(
221207
""".stripMargin
222208
throw new AnalysisException(errorMessage)
223209
}
210+
existingSchema = Some(l.schema)
224211
case o =>
225212
throw new AnalysisException(s"Saving data in ${o.toString} is not supported.")
226213
}
@@ -234,15 +221,23 @@ case class CreateMetastoreDataSourceAsSelect(
234221
createMetastoreTable = true
235222
}
236223

237-
val df = DataFrame(hiveContext, query)
224+
val data = DataFrame(hiveContext, query)
225+
val df = existingSchema match {
226+
// If we are inserting into an existing table, just use the existing schema.
227+
case Some(schema) => sqlContext.createDataFrame(data.queryExecution.toRdd, schema)
228+
case None => data
229+
}
238230

239231
// Create the relation based on the data of df.
240-
ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df)
232+
val resolved = ResolvedDataSource(sqlContext, provider, mode, optionsWithPath, df)
241233

242234
if (createMetastoreTable) {
235+
// We will use the schema of resolved.relation as the schema of the table (instead of
236+
// the schema of df). It is important since the nullability may be changed by the relation
237+
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
243238
hiveContext.catalog.createDataSourceTable(
244239
tableName,
245-
Some(df.schema),
240+
Some(resolved.relation.schema),
246241
provider,
247242
optionsWithPath,
248243
isExternal)

0 commit comments

Comments
 (0)