Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ class ColumnarRule {
* Provides a common executor to translate an [[RDD]] of [[ColumnarBatch]] into an [[RDD]] of
* [[InternalRow]]. This is inserted whenever such a transition is determined to be needed.
*
* The implementation is based off of similar implementations in [[ColumnarBatchScan]],
* [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]], and
* The implementation is based off of similar implementations in
* [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
*/
case class ColumnarToRowExec(child: SparkPlan)
Expand Down Expand Up @@ -96,9 +96,6 @@ case class ColumnarToRowExec(child: SparkPlan)
/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ColumnarToRowExec's comment also mentions ColumnarBatchScan. If you are like to remove all reference to ColumnarBatchScan...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch. I thought I got rid of all of them. Will grep though again.

* Generate [[ColumnVector]] expressions for our parent to consume as rows.
* This is called once per [[ColumnVector]] in the batch.
*
* This code came unchanged from [[ColumnarBatchScan]] and will hopefully replace it
* at some point.
*/
private def genCodeColumnVector(
ctx: CodegenContext,
Expand Down Expand Up @@ -130,9 +127,6 @@ case class ColumnarToRowExec(child: SparkPlan)
* Produce code to process the input iterator as [[ColumnarBatch]]es.
* This produces an [[org.apache.spark.sql.catalyst.expressions.UnsafeRow]] for each row in
* each batch.
*
* This code came almost completely unchanged from [[ColumnarBatchScan]] and will
* hopefully replace it at some point.
*/
override protected def doProduce(ctx: CodegenContext): String = {
// PhysicalRDD always just has one input
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat =>
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.BitSet

trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
trait DataSourceScanExec extends LeafExecNode {
val relation: BaseRelation
val tableIdentifier: Option[TableIdentifier]

Expand Down Expand Up @@ -69,6 +70,12 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport {
private def redact(text: String): String = {
Utils.redact(sqlContext.sessionState.conf.stringRedactionPattern, text)
}

/**
* The data being read in. This is to provide input to the tests in a way compatible with
* [[InputRDDCodegen]] which all implementations used to extend.
*/
def inputRDDs(): Seq[RDD[InternalRow]]
}

/** Physical plan node for scanning data from a relation. */
Expand Down Expand Up @@ -141,11 +148,11 @@ case class FileSourceScanExec(
optionalBucketSet: Option[BitSet],
dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with ColumnarBatchScan {
extends DataSourceScanExec {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change making all DataSourceScanExec nodes not codegen support, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, but there were only 2 things that the code gen was doing. Either convert ColumnarBatch into UnsafeRows or to convert whatever other rows were being returned by the DataSourceScanExec into UnsafeRows. The ColumnarBatch conversion is now covered by ColumnarToRowExec. The row to row conversion is covered by UnsafeProjections that are either inserted as a part of this patch or were already in the code, so we ended up doing a double conversion.


// Note that some vals referring the file-based relation are lazy intentionally
// so that this plan can be canonicalized on executor side too. See SPARK-23731.
override lazy val supportsBatch: Boolean = {
override lazy val supportsColumnar: Boolean = {
relation.fileFormat.supportBatch(relation.sparkSession, schema)
}

Expand Down Expand Up @@ -275,7 +282,7 @@ case class FileSourceScanExec(
Map(
"Format" -> relation.fileFormat.toString,
"ReadSchema" -> requiredSchema.catalogString,
"Batched" -> supportsBatch.toString,
"Batched" -> supportsColumnar.toString,
"PartitionFilters" -> seqToString(partitionFilters),
"PushedFilters" -> seqToString(pushedDownFilters),
"DataFilters" -> seqToString(dataFilters),
Expand All @@ -302,7 +309,7 @@ case class FileSourceScanExec(
withSelectedBucketsCount
}

private lazy val inputRDD: RDD[InternalRow] = {
lazy val inputRDD: RDD[InternalRow] = {
val readFile: (PartitionedFile) => Iterator[InternalRow] =
relation.fileFormat.buildReaderWithPartitionValues(
sparkSession = relation.sparkSession,
Expand Down Expand Up @@ -334,29 +341,30 @@ case class FileSourceScanExec(
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))

protected override def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
// in the case of fallback, this batched scan should never fail because of:
// 1) only primitive types are supported
// 2) the number of columns should be smaller than spark.sql.codegen.maxFields
WholeStageCodegenExec(this)(codegenStageId = 0).execute()
} else {
val numOutputRows = longMetric("numOutputRows")

if (needsUnsafeRowConversion) {
inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map( r => {
numOutputRows += 1
proj(r)
})
}
} else {
inputRDD.map { r =>
val numOutputRows = longMetric("numOutputRows")

if (needsUnsafeRowConversion) {
inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map( r => {
numOutputRows += 1
r
}
proj(r)
})
}
} else {
inputRDD.map { r =>
numOutputRows += 1
r
}
}
}

protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch =>
numOutputRows += batch.numRows()
batch
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,11 +709,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
s"${sqlContext.conf.hugeMethodLimit}, and the whole-stage codegen was disabled " +
s"for this plan (id=$codegenStageId). To avoid this, you can raise the limit " +
s"`${SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.key}`:\n$treeString")
child match {
// The fallback solution of batch file source scan still uses WholeStageCodegenExec
case f: FileSourceScanExec if f.supportsBatch => // do nothing
case _ => return child.execute()
}
return child.execute()
}

val references = ctx.references.toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ case class AdaptiveSparkPlanExec(
// optimizations should be stage-independent.
@transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
ReduceNumShufflePartitions(conf),
ApplyColumnarRulesAndInsertTransitions(session.sessionState.conf,
session.sessionState.columnarRules),
CollapseCodegenStages(conf)
)

Expand Down
Loading