Skip to content

Commit eee2f5a

Browse files
author
Marcelo Vanzin
committed
Ensure server.stop() is called when shutting down.
Also remove the cleanup code from the fs provider. It would be better to clean up, but there's a race between that code's cleanup and Hadoop's shutdown hook, which closes all file systems kept in the cache. So if you try to clean up the fs provider in a shut down hook, you may end up with ugly exceptions in the output. But leave the stop() functionality around in case it's useful for future provider implementations.
1 parent bda2fa1 commit eee2f5a

2 files changed

Lines changed: 7 additions & 11 deletions

File tree

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
5454
*/
5555
private val logCheckingThread = new Thread("LogCheckingThread") {
5656
override def run() = Utils.logUncaughtExceptions {
57-
while (!stopped) {
57+
while (true) {
5858
val now = System.currentTimeMillis
5959
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
6060
Thread.sleep(UPDATE_INTERVAL_MS)
@@ -68,8 +68,6 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
6868
}
6969
}
7070

71-
@volatile private var stopped = false
72-
7371
initialize()
7472

7573
private def initialize() {
@@ -87,13 +85,6 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
8785
logCheckingThread.start()
8886
}
8987

90-
override def stop() = {
91-
stopped = true
92-
logCheckingThread.interrupt()
93-
logCheckingThread.join()
94-
fs.close()
95-
}
96-
9788
override def getListing(offset: Int, count: Int) = {
9889
val list = appList.get()
9990
val theOffset = if (offset < list.size) offset else 0

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,14 @@ object HistoryServer {
195195
val server = new HistoryServer(conf, provider, securityManager, port)
196196
server.bind()
197197

198+
Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
199+
override def run() = {
200+
server.stop()
201+
}
202+
})
203+
198204
// Wait until the end of the world... or if the HistoryServer process is manually stopped
199205
while(true) { Thread.sleep(Int.MaxValue) }
200-
server.stop()
201206
}
202207

203208
def initSecurity() {

0 commit comments

Comments
 (0)