Skip to content

Commit e58b407

Browse files
gatorsmileJackey Lee
authored andcommitted
[SPARK-25674][FOLLOW-UP] Update the stats for each ColumnarBatch
## What changes were proposed in this pull request? This PR is a follow-up of apache#22594 . This alternative can avoid the unneeded computation in the hot code path. - For row-based scan, we keep the original way. - For the columnar scan, we just need to update the stats after each batch. ## How was this patch tested? N/A Closes apache#22731 from gatorsmile/udpateStatsFileScanRDD. Authored-by: gatorsmile <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 0146b4f commit e58b407

1 file changed

Lines changed: 8 additions & 7 deletions

File tree

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class FileScanRDD(
8585
// If we do a coalesce, however, we are likely to compute multiple partitions in the same
8686
// task and in the same thread, in which case we need to avoid override values written by
8787
// previous partitions (SPARK-13071).
88-
private def updateBytesRead(): Unit = {
88+
private def incTaskInputMetricsBytesRead(): Unit = {
8989
inputMetrics.setBytesRead(existingBytesRead + getBytesReadCallback())
9090
}
9191

@@ -106,15 +106,16 @@ class FileScanRDD(
106106
// don't need to run this `if` for every record.
107107
val preNumRecordsRead = inputMetrics.recordsRead
108108
if (nextElement.isInstanceOf[ColumnarBatch]) {
109+
incTaskInputMetricsBytesRead()
109110
inputMetrics.incRecordsRead(nextElement.asInstanceOf[ColumnarBatch].numRows())
110111
} else {
112+
// too costly to update every record
113+
if (inputMetrics.recordsRead %
114+
SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
115+
incTaskInputMetricsBytesRead()
116+
}
111117
inputMetrics.incRecordsRead(1)
112118
}
113-
// The records may be incremented by more than 1 at a time.
114-
if (preNumRecordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS !=
115-
inputMetrics.recordsRead / SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS) {
116-
updateBytesRead()
117-
}
118119
nextElement
119120
}
120121

@@ -201,7 +202,7 @@ class FileScanRDD(
201202
}
202203

203204
override def close(): Unit = {
204-
updateBytesRead()
205+
incTaskInputMetricsBytesRead()
205206
InputFileBlockHolder.unset()
206207
}
207208
}

0 commit comments

Comments
 (0)