Skip to content

Commit 304ae31

Browse files
committed
remove the overwrite parameter
1 parent 416ea37 commit 304ae31

4 files changed

Lines changed: 20 additions & 36 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,8 @@ case class CreateDataSourceTableAsSelectCommand(
140140
return Seq.empty
141141
}
142142

143-
saveDataIntoTable(sparkSession, table, table.storage.locationUri, query, mode,
144-
overwrite = false, tableExists = true)
143+
saveDataIntoTable(
144+
sparkSession, table, table.storage.locationUri, query, SaveMode.Append, tableExists = true)
145145
} else {
146146
assert(table.schema.isEmpty)
147147

@@ -151,7 +151,7 @@ case class CreateDataSourceTableAsSelectCommand(
151151
table.storage.locationUri
152152
}
153153
val result = saveDataIntoTable(
154-
sparkSession, table, tableLocation, query, mode, overwrite = true, tableExists = false)
154+
sparkSession, table, tableLocation, query, SaveMode.Overwrite, tableExists = false)
155155
val newTable = table.copy(
156156
storage = table.storage.copy(locationUri = tableLocation),
157157
// We will use the schema of resolved.relation as the schema of the table (instead of
@@ -178,7 +178,6 @@ case class CreateDataSourceTableAsSelectCommand(
178178
tableLocation: Option[String],
179179
data: LogicalPlan,
180180
mode: SaveMode,
181-
overwrite: Boolean,
182181
tableExists: Boolean): BaseRelation = {
183182
// Create the relation based on the input logical plan: `data`.
184183
val pathOption = tableLocation.map("path" -> _)
@@ -191,7 +190,7 @@ case class CreateDataSourceTableAsSelectCommand(
191190
catalogTable = if (tableExists) Some(table) else None)
192191

193192
try {
194-
dataSource.writeAndRead(mode, Dataset.ofRows(session, query), Some(overwrite))
193+
dataSource.writeAndRead(mode, Dataset.ofRows(session, query))
195194
} catch {
196195
case ex: AnalysisException =>
197196
logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala

Lines changed: 15 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -406,11 +406,7 @@ case class DataSource(
406406
/**
407407
* Writes the given [[DataFrame]] out in this [[FileFormat]].
408408
*/
409-
private def writeInFileFormat(
410-
format: FileFormat,
411-
mode: SaveMode,
412-
data: DataFrame,
413-
overwrite: Option[Boolean]): Unit = {
409+
private def writeInFileFormat(format: FileFormat, mode: SaveMode, data: DataFrame): Unit = {
414410
// Don't glob path for the write path. The contracts here are:
415411
// 1. Only one output path can be specified on the write path;
416412
// 2. Output path must be a legal HDFS style file system path;
@@ -426,26 +422,18 @@ case class DataSource(
426422
s"got: ${allPaths.mkString(", ")}")
427423
}
428424

429-
val isOverWrite = overwrite match {
430-
case Some(ow) => ow
431-
case _ =>
432-
if (pathExists) {
433-
if (mode == SaveMode.ErrorIfExists) {
434-
throw new AnalysisException(s"path $outputPath already exists.")
435-
}
436-
if (mode == SaveMode.Ignore) {
437-
// Since the path already exists and the save mode is Ignore, we will just return.
438-
return
439-
}
440-
441-
if (mode == SaveMode.Append) false
442-
else if (mode == SaveMode.Overwrite) true
443-
else {
444-
throw new IllegalStateException(s"unsupported save mode $mode")
445-
}
446-
} else true
425+
if (pathExists) {
426+
if (mode == SaveMode.ErrorIfExists) {
427+
throw new AnalysisException(s"path $outputPath already exists.")
428+
}
429+
if (mode == SaveMode.Ignore) {
430+
// Since the path already exists and the save mode is Ignore, we will just return.
431+
return
432+
}
447433
}
448434

435+
// if path does not exist, the ErrorIfExists and Ignore can be transformed to Append
436+
val transformedMode = if (mode != SaveMode.Overwrite) SaveMode.Append else mode
449437
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
450438
PartitioningUtils.validatePartitionColumn(data.schema, partitionColumns, caseSensitive)
451439

@@ -476,8 +464,7 @@ case class DataSource(
476464
fileFormat = format,
477465
options = options,
478466
query = data.logicalPlan,
479-
mode = mode,
480-
isOverWrite,
467+
mode = transformedMode,
481468
catalogTable = catalogTable,
482469
fileIndex = fileIndex)
483470
sparkSession.sessionState.executePlan(plan).toRdd
@@ -487,7 +474,7 @@ case class DataSource(
487474
* Writes the given [[DataFrame]] out to this [[DataSource]] and returns a [[BaseRelation]] for
488475
* the following reading.
489476
*/
490-
def writeAndRead(mode: SaveMode, data: DataFrame, overwrite: Option[Boolean]): BaseRelation = {
477+
def writeAndRead(mode: SaveMode, data: DataFrame): BaseRelation = {
491478
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
492479
throw new AnalysisException("Cannot save interval data type into external storage.")
493480
}
@@ -496,7 +483,7 @@ case class DataSource(
496483
case dataSource: CreatableRelationProvider =>
497484
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
498485
case format: FileFormat =>
499-
writeInFileFormat(format, mode, data, overwrite)
486+
writeInFileFormat(format, mode, data)
500487
// Replace the schema with that of the DataFrame we just wrote out to avoid re-inferring
501488
copy(userSpecifiedSchema = Some(data.schema.asNullable)).resolveRelation()
502489
case _ =>
@@ -516,7 +503,7 @@ case class DataSource(
516503
case dataSource: CreatableRelationProvider =>
517504
dataSource.createRelation(sparkSession.sqlContext, mode, caseInsensitiveOptions, data)
518505
case format: FileFormat =>
519-
writeInFileFormat(format, mode, data, None)
506+
writeInFileFormat(format, mode, data)
520507
case _ =>
521508
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
522509
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,6 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
201201
t.options,
202202
actualQuery,
203203
mode,
204-
overwrite,
205204
table,
206205
Some(t.location))
207206
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ case class InsertIntoHadoopFsRelationCommand(
4747
options: Map[String, String],
4848
query: LogicalPlan,
4949
mode: SaveMode,
50-
overwrite: Boolean,
5150
catalogTable: Option[CatalogTable],
5251
fileIndex: Option[FileIndex])
5352
extends RunnableCommand {
@@ -98,7 +97,7 @@ case class InsertIntoHadoopFsRelationCommand(
9897
outputPath = outputPath.toString,
9998
isAppend = isAppend)
10099

101-
if (overwrite) {
100+
if (mode == SaveMode.Overwrite) {
102101
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
103102
}
104103

0 commit comments

Comments
 (0)