Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.command.{DataWritingCommand, DataWritingCommandExec}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need DataWritingCommand here, too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove it

import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.SQLMetricInfo

Expand Down Expand Up @@ -60,6 +61,7 @@ private[execution] object SparkPlanInfo {
// dump the file scan metadata (e.g file path) to event log
val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
case writing: DataWritingCommandExec => writing.metadata
case _ => Map[String, String]()
}
new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.command

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
Expand Down Expand Up @@ -48,6 +49,8 @@ trait DataWritingCommand extends Command {
def outputColumns: Seq[Attribute] =
DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames)

def outputPath: Option[Path]

lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics

def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,22 @@ case class DataWritingCommandExec(cmd: DataWritingCommand, child: SparkPlan)
protected override def doExecute(): RDD[InternalRow] = {
sqlContext.sparkContext.parallelize(sideEffectResult, 1)
}

// Metadata that describes more details of this writing.
lazy val metadata: Map[String, String] = {
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
val outputPath = cmd.outputPath match {
case Some(path) if path != null => path.toString
case _ => ""
}
val columnNames = cmd.outputColumnNames
val metadata =
Map(
"OutputColumnNames" -> seqToString(columnNames),
"OutputPath" -> outputPath
)
metadata
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.sql.execution.command

import java.net.URI

import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources._
Expand Down Expand Up @@ -221,4 +222,6 @@ case class CreateDataSourceTableAsSelectCommand(
throw ex
}
}

override def outputPath: Option[Path] = table.storage.locationUri.map(new Path(_))
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ case class DataSource(
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
InsertIntoHadoopFsRelationCommand(
outputPath = outputPath,
outputFsPath = outputPath,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you undo this redundant change, @LantaoJin ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field overwrites the outputPath in DataWritingCommand and the return type is different (Path vs Option[Path]), so I rename this.

staticPartitions = Map.empty,
ifPartitionNotExists = false,
partitionColumns = partitionColumns.map(UnresolvedAttribute.quoted),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.spark.sql.util.SchemaUtils
* Only valid for static partitions.
*/
case class InsertIntoHadoopFsRelationCommand(
outputPath: Path,
outputFsPath: Path,
staticPartitions: TablePartitionSpec,
ifPartitionNotExists: Boolean,
partitionColumns: Seq[Attribute],
Expand All @@ -64,12 +64,12 @@ case class InsertIntoHadoopFsRelationCommand(
// Most formats don't do well with duplicate columns, so lets not allow that
SchemaUtils.checkColumnNameDuplication(
outputColumnNames,
s"when inserting into $outputPath",
s"when inserting into $outputFsPath",
sparkSession.sessionState.conf.caseSensitiveAnalysis)

val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options)
val fs = outputPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val fs = outputFsPath.getFileSystem(hadoopConf)
val qualifiedOutputPath = outputFsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)

val partitionsTrackedByCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions &&
catalogTable.isDefined &&
Expand Down Expand Up @@ -106,7 +106,7 @@ case class InsertIntoHadoopFsRelationCommand(
val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = outputPath.toString,
outputPath = outputFsPath.toString,
dynamicPartitionOverwrite = dynamicPartitionOverwrite)

val doInsertion = (mode, pathExists) match {
Expand Down Expand Up @@ -184,7 +184,7 @@ case class InsertIntoHadoopFsRelationCommand(
// refresh cached files in FileIndex
fileIndex.foreach(_.refresh())
// refresh data cache if table is cached
sparkSession.catalog.refreshByPath(outputPath.toString)
sparkSession.catalog.refreshByPath(outputFsPath.toString)

if (catalogTable.nonEmpty) {
CommandUtils.updateTableStats(sparkSession, catalogTable.get)
Expand Down Expand Up @@ -261,4 +261,6 @@ case class InsertIntoHadoopFsRelationCommand(
}
}.toMap
}

override def outputPath: Option[Path] = Some(this.outputFsPath)
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ class SparkPlanSuite extends QueryTest with SharedSQLContext {
assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata.nonEmpty)
}
}

test("SPARK-25421 DataWritingCommandExec should contains 'OutputPath' metadata") {
withTable("t") {
sql("CREATE TABLE t(col_I int) USING PARQUET")
val f = sql("INSERT OVERWRITE TABLE t SELECT 1")
assert(SparkPlanInfo.fromSparkPlan(f.queryExecution.sparkPlan).metadata
.contains("OutputPath"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

package org.apache.spark.sql.hive.execution

import org.apache.hadoop.fs.Path
import scala.util.control.NonFatal

import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
Expand Down Expand Up @@ -100,4 +100,6 @@ case class CreateHiveTableAsSelectCommand(
s"TableName: ${tableDesc.identifier.table}, " +
s"InsertIntoHiveTable]"
}

override def outputPath: Option[Path] = tableDesc.storage.locationUri.map(new Path(_))
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.hive.client.HiveClientImpl
Expand Down Expand Up @@ -131,5 +130,7 @@ case class InsertIntoHiveDirCommand(

Seq.empty[Row]
}

override def outputPath: Option[Path] = storage.locationUri.map(new Path(_))
}

Original file line number Diff line number Diff line change
Expand Up @@ -264,4 +264,6 @@ case class InsertIntoHiveTable(
isSrcLocal = false)
}
}

override def outputPath: Option[Path] = table.storage.locationUri.map(new Path(_))
}