Skip to content

Commit 8d7dbde

Browse files
mgaido91cloud-fan
authored andcommitted
[SPARK-26003] Improve SQLAppStatusListener.aggregateMetrics performance
## What changes were proposed in this pull request? In `SQLAppStatusListener.aggregateMetrics`, we use the `metricIds` only to filter the relevant metrics. And this is a Seq which is also sorted. When there are many metrics involved, this can be pretty inefficient. The PR proposes to use a Set for it. ## How was this patch tested? NA Closes #23002 from mgaido91/SPARK-26003. Authored-by: Marco Gaido <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent c491934 commit 8d7dbde

File tree

1 file changed

+2
-3
lines changed

1 file changed

+2
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,18 +159,17 @@ class SQLAppStatusListener(
159159
}
160160

161161
private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
162-
val metricIds = exec.metrics.map(_.accumulatorId).sorted
163162
val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
164163
val metrics = exec.stages.toSeq
165164
.flatMap { stageId => Option(stageMetrics.get(stageId)) }
166165
.flatMap(_.taskMetrics.values().asScala)
167166
.flatMap { metrics => metrics.ids.zip(metrics.values) }
168167

169168
val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq)
170-
.filter { case (id, _) => metricIds.contains(id) }
169+
.filter { case (id, _) => metricTypes.contains(id) }
171170
.groupBy(_._1)
172171
.map { case (id, values) =>
173-
id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
172+
id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2))
174173
}
175174

176175
// Check the execution again for whether the aggregated metrics data has been calculated.

0 commit comments

Comments
 (0)