Skip to content

Commit 19e1fb4

Browse files
committed
Address Thomas' comments
The biggest changes here include: (1) Periodically checking for event log updates in the background, instead of on refresh, (2) Use a Long instead of a linearly scaling HashSet to keep track of the applications whose UIs are purposefully not rendered, and (3) Adding Spark user as a new column. This includes adding a new field to the ApplicationStart event.
1 parent 248cb3d commit 19e1fb4

8 files changed

Lines changed: 156 additions & 80 deletions

File tree

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1103,7 +1103,7 @@ class SparkContext(
11031103

11041104
/** Post the application start event */
11051105
private def postApplicationStart() {
1106-
listenerBus.post(SparkListenerApplicationStart(appName, startTime))
1106+
listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
11071107
}
11081108

11091109
/**

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 113 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ package org.apache.spark.deploy.history
2020
import javax.servlet.http.HttpServletRequest
2121

2222
import scala.collection.mutable
23-
import scala.concurrent._
24-
import scala.concurrent.ExecutionContext.Implicits.global
2523

2624
import org.apache.hadoop.fs.{FileStatus, Path}
2725
import org.eclipse.jetty.servlet.ServletContextHandler
@@ -56,14 +54,44 @@ class HistoryServer(
5654
import HistoryServer._
5755

5856
private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
59-
private val bindHost = Utils.localHostName()
60-
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
57+
private val localHost = Utils.localHostName()
58+
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
6159
private val port = requestedPort
6260
private val securityManager = new SecurityManager(conf)
6361
private val indexPage = new IndexPage(this)
6462

6563
// A timestamp of when the disk was last accessed to check for log updates
66-
private var lastLogCheck = -1L
64+
private var lastLogCheckTime = -1L
65+
66+
// If an application is last updated after this threshold, then its UI is retained
67+
private var updateTimeThreshold = -1L
68+
69+
// Number of applications hidden from the UI because the application limit has been reached
70+
private var numApplicationsHidden = 0
71+
72+
@volatile private var stopped = false
73+
74+
/**
75+
* A background thread that periodically checks for event log updates on disk.
76+
*
77+
* If a log check is invoked manually in the middle of a period, this thread re-adjusts the
78+
* time at which it performs the next log check to maintain the same period as before.
79+
*/
80+
private val logCheckingThread = new Thread {
81+
override def run() {
82+
while (!stopped) {
83+
val now = System.currentTimeMillis
84+
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
85+
checkForLogs()
86+
Thread.sleep(UPDATE_INTERVAL_MS)
87+
} else {
88+
// If the user has manually checked for logs recently, wait until
89+
// UPDATE_INTERVAL_MS after the last check time
90+
Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
91+
}
92+
}
93+
}
94+
}
6795

6896
private val handlers = Seq[ServletContextHandler](
6997
createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
@@ -74,13 +102,20 @@ class HistoryServer(
74102
// A mapping of application ID to its history information, which includes the rendered UI
75103
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()
76104

77-
// A set of recently removed applications that the server should avoid re-rendering
78-
val appIdBlacklist = mutable.HashSet[String]()
105+
/**
106+
* Start the history server.
107+
*
108+
* This starts a background thread that periodically synchronizes information displayed on
109+
* this UI with the event logs in the provided base directory.
110+
*/
111+
def start() {
112+
logCheckingThread.start()
113+
}
79114

80-
/** Bind to the HTTP server behind this web interface */
115+
/** Bind to the HTTP server behind this web interface. */
81116
override def bind() {
82117
try {
83-
serverInfo = Some(startJettyServer(bindHost, port, handlers, conf))
118+
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
84119
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
85120
} catch {
86121
case e: Exception =>
@@ -91,7 +126,8 @@ class HistoryServer(
91126
}
92127

93128
/**
94-
* Asynchronously check for any updates to event logs in the base directory.
129+
* Check for any updates to event logs in the base directory. This is only effective once
130+
* the server has been bound.
95131
*
96132
* If a new finished application is found, the server renders the associated SparkUI
97133
* from the application's event logs, attaches this UI to itself, and stores metadata
@@ -100,41 +136,54 @@ class HistoryServer(
100136
* If the logs for an existing finished application are no longer found, the server
101137
* removes all associated information and detaches the SparkUI.
102138
*/
103-
def checkForLogs() {
104-
if (logCheckReady) {
105-
lastLogCheck = System.currentTimeMillis
106-
val asyncCheck = future {
139+
def checkForLogs() = synchronized {
140+
if (serverInfo.isDefined) {
141+
lastLogCheckTime = System.currentTimeMillis
142+
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
143+
try {
107144
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
108145
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
109146

110147
// Forget about any SparkUIs that can no longer be found
111-
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
148+
val mostRecentAppIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
112149
appIdToInfo.foreach { case (appId, info) =>
113-
if (!appIds.contains(appId)) {
150+
if (!mostRecentAppIds.contains(appId)) {
114151
detachUI(info.ui)
115152
appIdToInfo.remove(appId)
116-
appIdBlacklist.clear()
153+
updateTimeThreshold = -1L
117154
}
118155
}
119-
appIdBlacklist.retain(appIds.contains)
156+
157+
// Keep track of the number of applications hidden from the UI this round
158+
var _numApplicationsHidden = 0
120159

121160
// Render SparkUI for any new completed applications
122161
logDirs.foreach { dir =>
123162
val path = dir.getPath.toString
124163
val appId = getAppId(path)
125164
val lastUpdated = getModificationTime(dir)
126-
if (!appIdToInfo.contains(appId) && !appIdBlacklist.contains(appId)) {
127-
maybeRenderUI(appId, path, lastUpdated)
165+
if (!appIdToInfo.contains(appId)) {
166+
if (lastUpdated > updateTimeThreshold) {
167+
maybeRenderUI(appId, path, lastUpdated)
168+
} else {
169+
// This application was previously blacklisted due to the application limit
170+
_numApplicationsHidden += 1
171+
}
128172
}
129173
// If the cap is reached, remove the least recently updated application
130174
if (appIdToInfo.size > RETAINED_APPLICATIONS) {
131175
removeOldestApp()
176+
_numApplicationsHidden += 1
132177
}
133178
}
179+
180+
numApplicationsHidden = _numApplicationsHidden
181+
182+
} catch {
183+
case t: Throwable => logError("Exception in checking for event log updates", t)
134184
}
135-
asyncCheck.onFailure { case t =>
136-
logError("Unable to synchronize HistoryServer with files on disk: ", t)
137-
}
185+
} else {
186+
logWarning("Attempted to check for event log updates before binding the server.")
138187
}
139188
}
140189

@@ -161,11 +210,12 @@ class HistoryServer(
161210
if (success && appListener.applicationStarted) {
162211
attachUI(ui)
163212
val appName = appListener.appName
164-
ui.setAppName("%s (finished)".format(appName))
213+
val sparkUser = appListener.sparkUser
165214
val startTime = appListener.startTime
166215
val endTime = appListener.endTime
167-
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)
168-
appIdToInfo(appId) = info
216+
ui.setAppName("%s (finished)".format(appName))
217+
appIdToInfo(appId) =
218+
ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, sparkUser, logPath, ui)
169219
} else {
170220
logWarning("Reconstructing application UI was unsuccessful. Either no event logs were" +
171221
"found or the event signaling application start is missing: %s".format(logPath))
@@ -175,9 +225,27 @@ class HistoryServer(
175225
}
176226
}
177227

228+
/**
229+
* Remove the oldest application and detach its associated UI.
230+
*
231+
* As an optimization, record the last updated time of this application as the minimum
232+
* update time threshold. Only applications with a last updated time that exceeds this
233+
* threshold will be retained by the server. This avoids re-rendering an old application
234+
* that is recently removed.
235+
*/
236+
private def removeOldestApp() {
237+
val appToRemove = appIdToInfo.toSeq.minBy { case (_, info) => info.lastUpdated }
238+
appToRemove match { case (id, info) =>
239+
appIdToInfo.remove(id)
240+
detachUI(info.ui)
241+
updateTimeThreshold = info.lastUpdated
242+
}
243+
}
244+
178245
/** Stop the server and close the file system. */
179246
override def stop() {
180247
super.stop()
248+
stopped = true
181249
fileSystem.close()
182250
}
183251

@@ -187,39 +255,26 @@ class HistoryServer(
187255
/** Return the address of this server. */
188256
def getAddress: String = "http://" + publicHost + ":" + boundPort
189257

190-
/** Return the total number of application logs found, blacklisted or not. */
191-
def getTotalApplications: Int = appIdToInfo.size + appIdBlacklist.size
258+
/** Return the total number of application logs found, whether or not the UI is retained. */
259+
def getTotalApplications: Int = appIdToInfo.size + numApplicationsHidden
192260

193261
/** Return when this directory was last modified. */
194262
private def getModificationTime(dir: FileStatus): Long = {
195-
val logFiles = fileSystem.listStatus(dir.getPath)
196-
if (logFiles != null) {
197-
logFiles.map(_.getModificationTime).max
198-
} else {
199-
dir.getModificationTime
200-
}
201-
}
202-
203-
/**
204-
* Remove the oldest application and detach its associated UI. As an optimization, add the
205-
* application to a blacklist to avoid re-rendering it the next time.
206-
*/
207-
private def removeOldestApp() {
208-
val appToRemove = appIdToInfo.toSeq.minBy { case (_, info) => info.lastUpdated }
209-
appToRemove match { case (id, info) =>
210-
appIdToInfo.remove(id)
211-
detachUI(info.ui)
212-
appIdBlacklist.add(id)
263+
try {
264+
val logFiles = fileSystem.listStatus(dir.getPath)
265+
if (logFiles != null) {
266+
logFiles.map(_.getModificationTime).max
267+
} else {
268+
dir.getModificationTime
269+
}
270+
} catch {
271+
case t: Throwable =>
272+
logError("Exception in accessing modification time of %s".format(dir.getPath), t)
273+
-1L
213274
}
214275
}
215-
216-
/** Return whether the last log check has happened sufficiently long ago. */
217-
private def logCheckReady: Boolean = {
218-
System.currentTimeMillis - lastLogCheck > UPDATE_INTERVAL_SECONDS * 1000
219-
}
220276
}
221277

222-
223278
/**
224279
* The recommended way of starting and stopping a HistoryServer is through the scripts
225280
* start-history-server.sh and stop-history-server.sh. The path to a base log directory
@@ -233,18 +288,19 @@ class HistoryServer(
233288
object HistoryServer {
234289
private val conf = new SparkConf
235290

236-
// Minimum interval between each check for logs, which requires a disk access (seconds)
237-
private val UPDATE_INTERVAL_SECONDS = conf.getInt("spark.history.updateInterval", 5)
291+
// Interval between each check for event log updates
292+
val UPDATE_INTERVAL_MS = conf.getInt("spark.history.updateInterval", 10) * 1000
238293

239294
// How many applications to retain
240-
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 20)
295+
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 250)
241296

242-
private val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
297+
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
243298

244299
def main(argStrings: Array[String]) {
245300
val args = new HistoryServerArguments(argStrings)
246301
val server = new HistoryServer(args.logDir, args.port, conf)
247302
server.bind()
303+
server.start()
248304

249305
// Wait until the end of the world... or if the HistoryServer process is manually stopped
250306
while(true) { Thread.sleep(Int.MaxValue) }
@@ -258,6 +314,7 @@ private[spark] case class ApplicationHistoryInfo(
258314
startTime: Long,
259315
endTime: Long,
260316
lastUpdated: Long,
317+
sparkUser: String,
261318
logPath: String,
262319
ui: SparkUI) {
263320
def started = startTime != -1

core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,24 +26,27 @@ import org.apache.spark.ui.{UIUtils, WebUI}
2626
private[spark] class IndexPage(parent: HistoryServer) {
2727

2828
def render(request: HttpServletRequest): Seq[Node] = {
29-
parent.checkForLogs()
30-
31-
// Populate app table, with most recently modified app first
3229
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
3330
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
3431
val content =
3532
<div class="row-fluid">
3633
<div class="span12">
3734
<ul class="unstyled">
3835
<li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
39-
<h4>
40-
Showing {parent.appIdToInfo.size}/{parent.getTotalApplications} Finished Applications
41-
</h4>
42-
{appTable}
4336
</ul>
37+
{
38+
if (parent.appIdToInfo.size > 0) {
39+
<h4>
40+
Showing {parent.appIdToInfo.size}/{parent.getTotalApplications}
41+
Finished Application{if (parent.getTotalApplications > 1) "s" else ""}
42+
</h4> ++
43+
appTable
44+
} else {
45+
<h4>No Finished Applications Found</h4>
46+
}
47+
}
4448
</div>
4549
</div>
46-
4750
UIUtils.basicSparkPage(content, "History Server")
4851
}
4952

@@ -52,6 +55,7 @@ private[spark] class IndexPage(parent: HistoryServer) {
5255
"Started",
5356
"Finished",
5457
"Duration",
58+
"Spark User",
5559
"Log Directory",
5660
"Last Updated")
5761

@@ -62,13 +66,15 @@ private[spark] class IndexPage(parent: HistoryServer) {
6266
val endTime = if (info.finished) WebUI.formatDate(info.endTime) else "Not finished"
6367
val difference = if (info.started && info.finished) info.endTime - info.startTime else -1L
6468
val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
69+
val sparkUser = if (info.started) info.sparkUser else "Unknown user"
6570
val logDirectory = parent.getAppId(info.logPath)
6671
val lastUpdated = WebUI.formatDate(info.lastUpdated)
6772
<tr>
6873
<td><a href={uiAddress}>{appName}</a></td>
6974
<td>{startTime}</td>
7075
<td>{endTime}</td>
7176
<td>{duration}</td>
77+
<td>{sparkUser}</td>
7278
<td>{logDirectory}</td>
7379
<td>{lastUpdated}</td>
7480
</tr>

core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package org.apache.spark.scheduler
2626
*/
2727
private[spark] class ApplicationEventListener extends SparkListener {
2828
var appName = "<Not Started>"
29+
var sparkUser = "<Not Started>"
2930
var startTime = -1L
3031
var endTime = -1L
3132

@@ -41,6 +42,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
4142
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
4243
appName = applicationStart.appName
4344
startTime = applicationStart.time
45+
sparkUser = applicationStart.sparkUser
4446
}
4547

4648
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {

0 commit comments

Comments
 (0)