-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28213][SQL][followup] code cleanup and bug fix for columnar execution framework #25264
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
867d94a
009d760
ec2a2b8
6bcbcc6
af177aa
308fc11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,40 +57,38 @@ class ColumnarRule { | |
| * [[org.apache.spark.sql.execution.python.ArrowEvalPythonExec]] and | ||
| * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those implementations. | ||
| */ | ||
| case class ColumnarToRowExec(child: SparkPlan) | ||
| extends UnaryExecNode with CodegenSupport { | ||
| case class ColumnarToRowExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { | ||
| assert(child.supportsColumnar) | ||
|
|
||
| override def output: Seq[Attribute] = child.output | ||
|
|
||
| override def outputPartitioning: Partitioning = child.outputPartitioning | ||
|
|
||
| override def outputOrdering: Seq[SortOrder] = child.outputOrdering | ||
|
|
||
| // `ColumnarToRowExec` processes the input RDD directly, which is kind of a leaf node in the | ||
| // codegen stage and needs to do the limit check. | ||
| protected override def canCheckLimitNotReached: Boolean = true | ||
|
|
||
| override lazy val metrics: Map[String, SQLMetric] = Map( | ||
| "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), | ||
| "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"), | ||
| "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time") | ||
| "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches") | ||
| ) | ||
|
|
||
| override def doExecute(): RDD[InternalRow] = { | ||
| val numOutputRows = longMetric("numOutputRows") | ||
| val numInputBatches = longMetric("numInputBatches") | ||
| val scanTime = longMetric("scanTime") | ||
| // UnsafeProjection is not serializable so do it on the executor side, which is why it is lazy | ||
| @transient lazy val outputProject = UnsafeProjection.create(output, output) | ||
| val batches = child.executeColumnar() | ||
| batches.flatMap(batch => { | ||
| val batchStartNs = System.nanoTime() | ||
| numInputBatches += 1 | ||
| // In order to match the numOutputRows metric in the generated code we update | ||
| // numOutputRows for each batch. This is less accurate than doing it at output | ||
| // because it will over count the number of rows output in the case of a limit, | ||
| // but it is more efficient. | ||
| numOutputRows += batch.numRows() | ||
| val ret = batch.rowIterator().asScala | ||
| scanTime += ((System.nanoTime() - batchStartNs) / (1000 * 1000)) | ||
| ret.map(outputProject) | ||
| }) | ||
| // This avoids calling `output` in the RDD closure, so that we don't need to include the entire | ||
| // plan (this) in the closure. | ||
| val localOutput = this.output | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
| child.executeColumnar().mapPartitionsInternal { batches => | ||
| val outputProject = UnsafeProjection.create(localOutput, localOutput) | ||
|
||
| batches.flatMap { batch => | ||
| numInputBatches += 1 | ||
| numOutputRows += batch.numRows() | ||
| batch.rowIterator().asScala.map(outputProject) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -136,9 +134,6 @@ case class ColumnarToRowExec(child: SparkPlan) | |
| // metrics | ||
| val numOutputRows = metricTerm(ctx, "numOutputRows") | ||
| val numInputBatches = metricTerm(ctx, "numInputBatches") | ||
| val scanTimeMetric = metricTerm(ctx, "scanTime") | ||
| val scanTimeTotalNs = | ||
| ctx.addMutableState(CodeGenerator.JAVA_LONG, "scanTime") // init as scanTime = 0 | ||
|
|
||
| val columnarBatchClz = classOf[ColumnarBatch].getName | ||
| val batch = ctx.addMutableState(columnarBatchClz, "batch") | ||
|
|
@@ -156,15 +151,13 @@ case class ColumnarToRowExec(child: SparkPlan) | |
| val nextBatchFuncName = ctx.addNewFunction(nextBatch, | ||
| s""" | ||
| |private void $nextBatch() throws java.io.IOException { | ||
| | long getBatchStart = System.nanoTime(); | ||
| | if ($input.hasNext()) { | ||
| | $batch = ($columnarBatchClz)$input.next(); | ||
| | $numInputBatches.add(1); | ||
| | $numOutputRows.add($batch.numRows()); | ||
| | $idx = 0; | ||
| | ${columnAssigns.mkString("", "\n", "\n")} | ||
| | ${numInputBatches}.add(1); | ||
| | } | ||
| | $scanTimeTotalNs += System.nanoTime() - getBatchStart; | ||
| |}""".stripMargin) | ||
|
|
||
| ctx.currentVars = null | ||
|
|
@@ -184,7 +177,7 @@ case class ColumnarToRowExec(child: SparkPlan) | |
| |if ($batch == null) { | ||
| | $nextBatchFuncName(); | ||
| |} | ||
| |while ($batch != null) { | ||
| |while ($limitNotReachedCond $batch != null) { | ||
| | int $numRows = $batch.numRows(); | ||
| | int $localEnd = $numRows - $idx; | ||
| | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { | ||
|
|
@@ -196,13 +189,11 @@ case class ColumnarToRowExec(child: SparkPlan) | |
| | $batch = null; | ||
| | $nextBatchFuncName(); | ||
| |} | ||
| |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); | ||
| |$scanTimeTotalNs = 0; | ||
| """.stripMargin | ||
| } | ||
|
|
||
| override def inputRDDs(): Seq[RDD[InternalRow]] = { | ||
| child.asInstanceOf[CodegenSupport].inputRDDs() | ||
| Seq(child.executeColumnar().asInstanceOf[RDD[InternalRow]]) // Hack because of type erasure | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -439,47 +430,46 @@ case class RowToColumnarExec(child: SparkPlan) extends UnaryExecNode { | |
| // Instead of creating a new config we are reusing columnBatchSize. In the future if we do | ||
| // combine with some of the Arrow conversion tools we will need to unify some of the configs. | ||
| val numRows = conf.columnBatchSize | ||
| val converters = new RowToColumnConverter(schema) | ||
| val rowBased = child.execute() | ||
| rowBased.mapPartitions(rowIterator => { | ||
| new Iterator[ColumnarBatch] { | ||
| var cb: ColumnarBatch = null | ||
|
|
||
| TaskContext.get().addTaskCompletionListener[Unit] { _ => | ||
| if (cb != null) { | ||
| cb.close() | ||
| cb = null | ||
| // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire | ||
| // plan (this) in the closure. | ||
| val localSchema = this.schema | ||
| child.execute().mapPartitionsInternal { rowIterator => | ||
| if (rowIterator.hasNext) { | ||
| new Iterator[ColumnarBatch] { | ||
| private val converters = new RowToColumnConverter(localSchema) | ||
| private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) { | ||
| OffHeapColumnVector.allocateColumns(numRows, localSchema) | ||
| } else { | ||
| OnHeapColumnVector.allocateColumns(numRows, localSchema) | ||
| } | ||
| } | ||
|
|
||
| override def hasNext: Boolean = { | ||
| rowIterator.hasNext | ||
| } | ||
| private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray) | ||
|
|
||
| override def next(): ColumnarBatch = { | ||
| if (cb != null) { | ||
| TaskContext.get().addTaskCompletionListener[Unit] { _ => | ||
| cb.close() | ||
| cb = null | ||
| } | ||
| val columnVectors : Array[WritableColumnVector] = | ||
| if (enableOffHeapColumnVector) { | ||
| OffHeapColumnVector.allocateColumns(numRows, schema).toArray | ||
| } else { | ||
| OnHeapColumnVector.allocateColumns(numRows, schema).toArray | ||
|
|
||
| override def hasNext: Boolean = { | ||
| rowIterator.hasNext | ||
| } | ||
|
|
||
| override def next(): ColumnarBatch = { | ||
| cb.setNumRows(0) | ||
| var rowCount = 0 | ||
| while (rowCount < numRows && rowIterator.hasNext) { | ||
| val row = rowIterator.next() | ||
| converters.convert(row, vectors.toArray) | ||
| rowCount += 1 | ||
| } | ||
| var rowCount = 0 | ||
| while (rowCount < numRows && rowIterator.hasNext) { | ||
| val row = rowIterator.next() | ||
| converters.convert(row, columnVectors) | ||
| rowCount += 1 | ||
| cb.setNumRows(rowCount) | ||
| numInputRows += rowCount | ||
| numOutputBatches += 1 | ||
| cb | ||
| } | ||
| cb = new ColumnarBatch(columnVectors.toArray, rowCount) | ||
| numInputRows += rowCount | ||
| numOutputBatches += 1 | ||
| cb | ||
| } | ||
| } else { | ||
| Iterator.empty | ||
| } | ||
| }) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition | |
| import org.apache.spark.sql.catalyst.util.truncatedString | ||
| import org.apache.spark.sql.execution.datasources._ | ||
| import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource} | ||
| import org.apache.spark.sql.execution.metric.SQLMetrics | ||
| import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} | ||
| import org.apache.spark.sql.sources.{BaseRelation, Filter} | ||
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.vectorized.ColumnarBatch | ||
|
|
@@ -334,37 +334,63 @@ case class FileSourceScanExec( | |
| inputRDD :: Nil | ||
| } | ||
|
|
||
| override lazy val metrics = | ||
| Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), | ||
| "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), | ||
| "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"), | ||
| "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) | ||
| override lazy val metrics = Map( | ||
| "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), | ||
| "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), | ||
| "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"), | ||
| "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) | ||
|
|
||
| private abstract class ScanTimeTrackingIterator[T]( | ||
| fileScanIterator: Iterator[T], | ||
| scanTimeMetrics: SQLMetric) extends Iterator[T] { | ||
|
|
||
| override def hasNext: Boolean = { | ||
| // The `FileScanRDD` returns an iterator which scans the file during the `hasNext` call. | ||
|
||
| val startNs = System.nanoTime() | ||
| val re = fileScanIterator.hasNext | ||
| scanTimeMetrics += ((System.nanoTime() - startNs) / (1000 * 1000)) | ||
|
||
| re | ||
| } | ||
| } | ||
|
|
||
| protected override def doExecute(): RDD[InternalRow] = { | ||
| val numOutputRows = longMetric("numOutputRows") | ||
|
|
||
| val scanTime = longMetric("scanTime") | ||
| if (needsUnsafeRowConversion) { | ||
| inputRDD.mapPartitionsWithIndexInternal { (index, iter) => | ||
| val proj = UnsafeProjection.create(schema) | ||
| proj.initialize(index) | ||
| iter.map( r => { | ||
| numOutputRows += 1 | ||
| proj(r) | ||
| }) | ||
|
|
||
| new ScanTimeTrackingIterator[InternalRow](iter, scanTime) { | ||
| override def next(): InternalRow = { | ||
| numOutputRows += 1 | ||
| proj(iter.next()) | ||
| } | ||
| } | ||
| } | ||
| } else { | ||
| inputRDD.map { r => | ||
| numOutputRows += 1 | ||
| r | ||
| inputRDD.mapPartitionsInternal { iter => | ||
| new ScanTimeTrackingIterator[InternalRow](iter, scanTime) { | ||
| override def next(): InternalRow = { | ||
| numOutputRows += 1 | ||
| iter.next() | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| protected override def doExecuteColumnar(): RDD[ColumnarBatch] = { | ||
| val numOutputRows = longMetric("numOutputRows") | ||
| inputRDD.asInstanceOf[RDD[ColumnarBatch]].map { batch => | ||
| numOutputRows += batch.numRows() | ||
| batch | ||
| val scanTime = longMetric("scanTime") | ||
| inputRDD.asInstanceOf[RDD[ColumnarBatch]].mapPartitionsInternal { batches => | ||
| new ScanTimeTrackingIterator[ColumnarBatch](batches, scanTime) { | ||
| override def next(): ColumnarBatch = { | ||
| val batch = batches.next() | ||
| numOutputRows += batch.numRows() | ||
| batch | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,6 @@ | |
|
|
||
| package org.apache.spark.sql.execution | ||
|
|
||
| import java.io.Writer | ||
| import java.util.Locale | ||
| import java.util.concurrent.atomic.AtomicInteger | ||
|
|
||
|
|
@@ -491,12 +490,8 @@ trait InputRDDCodegen extends CodegenSupport { | |
| * | ||
| * This is the leaf node of a tree with WholeStageCodegen that is used to generate code | ||
| * that consumes an RDD iterator of InternalRow. | ||
| * | ||
| * @param isChildColumnar true if the inputRDD is really columnar data hidden by type erasure, | ||
| * false if inputRDD is really an RDD[InternalRow] | ||
| */ | ||
| case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean) | ||
| extends UnaryExecNode with InputRDDCodegen { | ||
| case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen { | ||
|
|
||
| override def output: Seq[Attribute] = child.output | ||
|
|
||
|
|
@@ -522,13 +517,10 @@ case class InputAdapter(child: SparkPlan, isChildColumnar: Boolean) | |
| child.executeColumnar() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
| override def inputRDD: RDD[InternalRow] = { | ||
| if (isChildColumnar) { | ||
| child.executeColumnar().asInstanceOf[RDD[InternalRow]] // Hack because of type erasure | ||
| } else { | ||
| child.execute() | ||
| } | ||
| } | ||
| // `InputAdapter` can only generate code to process the rows from its child. If the child produces | ||
| // columnar batches, there must be a `ColumnarToRowExec` above `InputAdapter` to handle it by | ||
| // overriding `inputRDD`. | ||
|
||
| override def inputRDD: RDD[InternalRow] = child.execute() | ||
|
|
||
| // This is a leaf node so the node can produce limit not reached checks. | ||
| override protected def canCheckLimitNotReached: Boolean = true | ||
|
|
@@ -870,59 +862,45 @@ case class CollapseCodegenStages( | |
| /** | ||
| * Inserts an InputAdapter on top of those that do not support codegen. | ||
| */ | ||
| private def insertInputAdapter(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = { | ||
| val isColumnar = adjustColumnar(plan, isColumnarInput) | ||
| private def insertInputAdapter(plan: SparkPlan): SparkPlan = { | ||
| plan match { | ||
| case p if !supportCodegen(p) => | ||
| // collapse them recursively | ||
| InputAdapter(insertWholeStageCodegen(p, isColumnar), isColumnar) | ||
| InputAdapter(insertWholeStageCodegen(p)) | ||
| case j: SortMergeJoinExec => | ||
| // The children of SortMergeJoin should do codegen separately. | ||
| j.withNewChildren(j.children.map( | ||
| child => InputAdapter(insertWholeStageCodegen(child, isColumnar), isColumnar))) | ||
| case p => | ||
| p.withNewChildren(p.children.map(insertInputAdapter(_, isColumnar))) | ||
| child => InputAdapter(insertWholeStageCodegen(child)))) | ||
| case p => p.withNewChildren(p.children.map(insertInputAdapter)) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Inserts a WholeStageCodegen on top of those that support codegen. | ||
| */ | ||
| private def insertWholeStageCodegen(plan: SparkPlan, isColumnarInput: Boolean): SparkPlan = { | ||
| val isColumnar = adjustColumnar(plan, isColumnarInput) | ||
| private def insertWholeStageCodegen(plan: SparkPlan): SparkPlan = { | ||
| plan match { | ||
| // For operators that will output domain object, do not insert WholeStageCodegen for it as | ||
| // domain object can not be written into unsafe row. | ||
| case plan if plan.output.length == 1 && plan.output.head.dataType.isInstanceOf[ObjectType] => | ||
| plan.withNewChildren(plan.children.map(insertWholeStageCodegen(_, isColumnar))) | ||
| plan.withNewChildren(plan.children.map(insertWholeStageCodegen)) | ||
| case plan: LocalTableScanExec => | ||
| // Do not make LogicalTableScanExec the root of WholeStageCodegen | ||
| // to support the fast driver-local collect/take paths. | ||
| plan | ||
| case plan: CodegenSupport if supportCodegen(plan) => | ||
| WholeStageCodegenExec( | ||
| insertInputAdapter(plan, isColumnar))(codegenStageCounter.incrementAndGet()) | ||
| // The whole-stage-codegen framework is row-based. If a plan supports columnar execution, | ||
| // it can't support whole-stage-codegen at the same time. | ||
| assert(!plan.supportsColumnar) | ||
| WholeStageCodegenExec(insertInputAdapter(plan))(codegenStageCounter.incrementAndGet()) | ||
| case other => | ||
| other.withNewChildren(other.children.map(insertWholeStageCodegen(_, isColumnar))) | ||
| other.withNewChildren(other.children.map(insertWholeStageCodegen)) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Depending on the stage in the plan and if we currently are columnar or not | ||
| * return if we are still columnar or not. | ||
| */ | ||
| private def adjustColumnar(plan: SparkPlan, isColumnar: Boolean): Boolean = | ||
| // We are walking up the plan, so columnar starts when we transition to rows | ||
| // and ends when we transition to columns | ||
| plan match { | ||
| case c2r: ColumnarToRowExec => true | ||
| case r2c: RowToColumnarExec => false | ||
| case _ => isColumnar | ||
| } | ||
|
|
||
| def apply(plan: SparkPlan): SparkPlan = { | ||
| if (conf.wholeStageEnabled) { | ||
| insertWholeStageCodegen(plan, false) | ||
| insertWholeStageCodegen(plan) | ||
| } else { | ||
| plan | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find num output rows to be useful because ColumnarToRowExec can happen at other times too, I am working on getting it to happen after pandas UDF operations. Plus the performance impact is only on the order of the number of batches. Not on the order of the number of rows, so it should have minimal impact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I was too conservative. I'll add it back, and revisit this when I benchmark Spark 3.0.