Skip to content

Commit 1f927a2

Browse files
committed
avoid to traverse all entities too frequently
1 parent ee53708 commit 1f927a2

1 file changed

Lines changed: 19 additions & 12 deletions

File tree

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ private[spark] class AppStatusListener(
7676
// around liveExecutors.
7777
@volatile private var activeExecutorCount = 0
7878

79+
/** The last time when flushing `LiveEntity`s. This is to avoid flushing too frequently. */
80+
private var lastFlushTimeNs = System.nanoTime()
81+
7982
kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
8083
{ count => cleanupExecutors(count) }
8184

@@ -89,7 +92,8 @@ private[spark] class AppStatusListener(
8992

9093
kvstore.onFlush {
9194
if (!live) {
92-
flush(update(_, _))
95+
val now = System.nanoTime()
96+
flush(update(_, now))
9397
}
9498
}
9599

@@ -834,7 +838,11 @@ private[spark] class AppStatusListener(
834838
// Flush updates if necessary. Executor heartbeat is an event that happens periodically. Flush
835839
// here to ensure the staleness of Spark UI doesn't last more that the executor heartbeat
836840
// interval.
837-
flush(maybeUpdate(_, _))
841+
if (now - lastFlushTimeNs > liveUpdatePeriodNs) {
842+
flush(maybeUpdate(_, now))
843+
// Re-get the current system time because `flush` may be slow and `now` is stale.
844+
lastFlushTimeNs = System.nanoTime()
845+
}
838846
}
839847

840848
override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = {
@@ -860,18 +868,17 @@ private[spark] class AppStatusListener(
860868
}
861869
}
862870

863-
/** Go through all `LiveEntity`s and use `entityFlushFunc(entity, now)` to flush them. */
864-
private def flush(entityFlushFunc: (LiveEntity, Long) => Unit): Unit = {
865-
val now = System.nanoTime()
871+
/** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */
872+
private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
866873
liveStages.values.asScala.foreach { stage =>
867-
entityFlushFunc(stage, now)
868-
stage.executorSummaries.values.foreach(entityFlushFunc(_, now))
874+
entityFlushFunc(stage)
875+
stage.executorSummaries.values.foreach(entityFlushFunc)
869876
}
870-
liveJobs.values.foreach(entityFlushFunc(_, now))
871-
liveExecutors.values.foreach(entityFlushFunc(_, now))
872-
liveTasks.values.foreach(entityFlushFunc(_, now))
873-
liveRDDs.values.foreach(entityFlushFunc(_, now))
874-
pools.values.foreach(entityFlushFunc(_, now))
877+
liveJobs.values.foreach(entityFlushFunc)
878+
liveExecutors.values.foreach(entityFlushFunc)
879+
liveTasks.values.foreach(entityFlushFunc)
880+
liveRDDs.values.foreach(entityFlushFunc)
881+
pools.values.foreach(entityFlushFunc)
875882
}
876883

877884
/**

0 commit comments

Comments
 (0)