Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 53 additions & 63 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,38 @@ class ColumnarRule {
* [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and
* [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations.
*/
case class ColumnarToRowExec(child: SparkPlan)
extends UnaryExecNode with CodegenSupport {
case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
assert(child.supportsColumnar)

override def output: Seq[Attribute] = child.output

override def outputPartitioning: Partitioning = child.outputPartitioning

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

// `ColumnarToRowExec` processes the input RDD directly, which is kind of a leaf node in the
// codegen stage and needs to do the limit check.
protected override def canCheckLimitNotReached: Boolean = true

override lazy val metrics: Map[String, SQLMetric] = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")
"numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches")
)

override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
val scanTime = longMetric("scanTime")
// UnsafeProjection is not serializable so do it on the executor side, which is why it is lazy
@transient lazy val outputProject = UnsafeProjection.create(output, output)
val batches = child.executeColumnar()
batches.flatMap(batch => {
val batchStartNs = System.nanoTime()
numInputBatches += 1
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
numOutputRows += batch.numRows()
val ret = batch.rowIterator().asScala
scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000))
ret.map(outputProject)
})
// This avoids calling `output` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localOutput = this.output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

child.executeColumnar().mapPartitionsInternal { batches =>
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batches.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
batch.rowIterator().asScala.map(toUnsafe)
}
}
}

/**
Expand Down Expand Up @@ -136,9 +134,6 @@ case class ColumnarToRowExec(child: SparkPlan)
// metrics
val numOutputRows = metricTerm(ctx, "numOutputRows")
val numInputBatches = metricTerm(ctx, "numInputBatches")
val scanTimeMetric = metricTerm(ctx, "scanTime")
val scanTimeTotalNs =
ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0

val columnarBatchClz = classOf[ColumnarBatch].getName
val batch = ctx.addMutableState(columnarBatchClz, "batch")
Expand All @@ -156,15 +151,13 @@ case class ColumnarToRowExec(child: SparkPlan)
val nextBatchFuncName = ctx.addNewFunction(nextBatch,
s"""
|private void $nextBatch() throws java.io.IOException {
| long getBatchStart = System.nanoTime();
| if ($input.hasNext()) {
| $batch = ($columnarBatchClz)$input.next();
| $numInputBatches.add(1);
| $numOutputRows.add($batch.numRows());
| $idx = 0;
| ${columnAssigns.mkString("", "\n", "\n")}
| ${numInputBatches}.add(1);
| }
| $scanTimeTotalNs += System.nanoTime() - getBatchStart;
|}""".stripMargin)

ctx.currentVars = null
Expand All @@ -184,7 +177,7 @@ case class ColumnarToRowExec(child: SparkPlan)
|if ($batch == null) {
| $nextBatchFuncName();
|}
|while ($batch != null) {
|while ($limitNotReachedCond $batch != null) {
| int $numRows = $batch.numRows();
| int $localEnd = $numRows - $idx;
| for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
Expand All @@ -196,13 +189,11 @@ case class ColumnarToRowExec(child: SparkPlan)
| $batch = null;
| $nextBatchFuncName();
|}
|$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
|$scanTimeTotalNs = 0;
""".stripMargin
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.asInstanceOf[CodegenSupport].inputRDDs()
Seq(child.executeColumnar().asInstanceOf[RDD[InternalRow]]) // Hack because of type erasure
}
}

Expand Down Expand Up @@ -439,47 +430,46 @@ case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
val numRows = conf.columnBatchSize
val converters = new RowToColumnConverter(schema)
val rowBased = child.execute()
rowBased.mapPartitions(rowIterator => {
new Iterator[ColumnarBatch] {
var cb: ColumnarBatch = null

TaskContext.get().addTaskCompletionListener[Unit] { _ =>
if (cb != null) {
cb.close()
cb = null
// This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localSchema = this.schema
child.execute().mapPartitionsInternal { rowIterator =>
if (rowIterator.hasNext) {
new Iterator[ColumnarBatch] {
private val converters = new RowToColumnConverter(localSchema)
private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
OffHeapColumnVector.allocateColumns(numRows, localSchema)
} else {
OnHeapColumnVector.allocateColumns(numRows, localSchema)
}
}

override def hasNext: Boolean = {
rowIterator.hasNext
}
private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)

override def next(): ColumnarBatch = {
if (cb != null) {
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
cb.close()
cb = null
}
val columnVectors : Array[WritableColumnVector] =
if (enableOffHeapColumnVector) {
OffHeapColumnVector.allocateColumns(numRows, schema).toArray
} else {
OnHeapColumnVector.allocateColumns(numRows, schema).toArray

override def hasNext: Boolean = {
rowIterator.hasNext
}

override def next(): ColumnarBatch = {
cb.setNumRows(0)
var rowCount = 0
while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next()
converters.convert(row, vectors.toArray)
rowCount += 1
}
var rowCount = 0
while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next()
converters.convert(row, columnVectors)
rowCount += 1
cb.setNumRows(rowCount)
numInputRows += rowCount
numOutputBatches += 1
cb
}
cb = new ColumnarBatch(columnVectors.toArray, rowCount)
numInputRows += rowCount
numOutputBatches += 1
cb
}
} else {
Iterator.empty
}
})
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -334,37 +334,61 @@ case class FileSourceScanExec(
inputRDD :: Nil
}

override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
"metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"),
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
"metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time")
) ++ {
// Tracking scan time has overhead, we can't afford to do it for each row, and can only do
// it for each batch.
if (supportsColumnar) {
Some("scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
} else {
None
}
}

protected override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")

if (needsUnsafeRowConversion) {
inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
val proj = UnsafeProjection.create(schema)
proj.initialize(index)
iter.map( r => {
val toUnsafe = UnsafeProjection.create(schema)
toUnsafe.initialize(index)
iter.map { row =>
numOutputRows += 1
proj(r)
})
toUnsafe(row)
}
}
} else {
inputRDD.map { r =>
numOutputRows += 1
r
inputRDD.mapPartitionsInternal { iter =>
iter.map { row =>
numOutputRows += 1
row
}
}
}
}

protected override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch =>
numOutputRows += batch.numRows()
batch
val scanTime = longMetric("scanTime")
inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches =>
new Iterator[ColumnarBatch] {

override def hasNext: Boolean = {
// The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call.
val startNs = System.nanoTime()
val res = batches.hasNext
scanTime += NANOSECONDS.toMillis(System.nanoTime() - startNs)
res
}

override def next(): ColumnarBatch = {
val batch = batches.next()
numOutputRows += batch.numRows()
batch
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import java.io.Writer
import java.util.Locale
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -491,12 +490,8 @@ trait InputRDDCodegen extends CodegenSupport {
*
* This is the leaf node of a tree with WholeStageCodegen that is used to generate code
* that consumes an RDD iterator of InternalRow.
*
* @param isChildColumnar true if the inputRDD is really columnar data hidden by type erasure,
* false if inputRDD is really an RDD[InternalRow]
*/
case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean)
extends UnaryExecNode with InputRDDCodegen {
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen {

override def output: Seq[Attribute] = child.output

Expand All @@ -522,13 +517,10 @@ case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean)
child.executeColumnar()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InputAdapter doesn't support columnar execution now? Seems we can change supportsColumnar and remove doExecuteColumnar?

}

override def inputRDD: RDD[InternalRow] = {
if (isChildColumnar) {
child.executeColumnar().asInstanceOf[RDD[InternalRow]] // Hack because of type erasure
} else {
child.execute()
}
}
// `InputAdapter` can only generate code to process the rows from its child. If the child produces
// columnar batches, there must be a `ColumnarToRowExec` above `InputAdapter` to handle it by
// overriding `inputRDDs` and calling `InputAdapter#executeColumnar` directly.
override def inputRDD: RDD[InternalRow] = child.execute()

// This is a leaf node so the node can produce limit not reached checks.
override protected def canCheckLimitNotReached: Boolean = true
Expand Down Expand Up @@ -870,59 +862,45 @@ case class CollapseCodegenStages(
/**
* Inserts an InputAdapter on top of those that do not support codegen.
*/
private def insertInputAdapter(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = {
val isColumnar = adjustColumnar(plan, isColumnarInput)
private def insertInputAdapter(plan: SparkPlan): SparkPlan = {
plan match {
case p if !supportCodegen(p) =>
// collapse them recursively
InputAdapter(insertWholeStageCodegen(p, isColumnar), isColumnar)
InputAdapter(insertWholeStageCodegen(p))
case j: SortMergeJoinExec =>
// The children of SortMergeJoin should do codegen separately.
j.withNewChildren(j.children.map(
child => InputAdapter(insertWholeStageCodegen(child, isColumnar), isColumnar)))
case p =>
p.withNewChildren(p.children.map(insertInputAdapter(_, isColumnar)))
child => InputAdapter(insertWholeStageCodegen(child))))
case p => p.withNewChildren(p.children.map(insertInputAdapter))
}
}

/**
* Inserts a WholeStageCodegen on top of those that support codegen.
*/
private def insertWholeStageCodegen(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = {
val isColumnar = adjustColumnar(plan, isColumnarInput)
private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = {
plan match {
// For operators that will output domain object, do not insert WholeStageCodegen for it as
// domain object can not be written into unsafe row.
case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] =>
plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_, isColumnar)))
plan.withNewChildren(plan.children.map(insertWholeStageCodegen))
case plan: LocalTableScanExec =>
// Do not make LogicalTableScanExec the root of WholeStageCodegen
// to support the fast driver-local collect/take paths.
plan
case plan: CodegenSupport if supportCodegen(plan) =>
WholeStageCodegenExec(
insertInputAdapter(plan, isColumnar))(codegenStageCounter.incrementAndGet())
// The whole-stage-codegen framework is row-based. If a plan supports columnar execution,
// it can't support whole-stage-codegen at the same time.
assert(!plan.supportsColumnar)
WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet())
case other =>
other.withNewChildren(other.children.map(insertWholeStageCodegen(_, isColumnar)))
other.withNewChildren(other.children.map(insertWholeStageCodegen))
}
}

/**
* Depending on the stage in the plan and if we currently are columnar or not
* return if we are still columnar or not.
*/
private def adjustColumnar(plan: SparkPlan, isColumnar: Boolean): Boolean =
// We are walking up the plan, so columnar starts when we transition to rows
// and ends when we transition to columns
plan match {
case c2r: ColumnarToRowExec => true
case r2c: RowToColumnarExec => false
case _ => isColumnar
}

def apply(plan: SparkPlan): SparkPlan = {
if (conf.wholeStageEnabled) {
insertWholeStageCodegen(plan, false)
insertWholeStageCodegen(plan)
} else {
plan
}
Expand Down
Loading