@@ -126,11 +126,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
126126 // Disable the background thread during tests.
127127 if (! conf.contains(" spark.testing" )) {
128128 // A task that periodically checks for event log updates on disk.
129- pool.scheduleAtFixedRate (getRunner(checkForLogs), 0 , UPDATE_INTERVAL_S , TimeUnit .SECONDS )
129+ pool.scheduleWithFixedDelay (getRunner(checkForLogs), 0 , UPDATE_INTERVAL_S , TimeUnit .SECONDS )
130130
131131 if (conf.getBoolean(" spark.history.fs.cleaner.enabled" , false )) {
132132 // A task that periodically cleans event logs on disk.
133- pool.scheduleAtFixedRate (getRunner(cleanLogs), 0 , CLEAN_INTERVAL_S , TimeUnit .SECONDS )
133+ pool.scheduleWithFixedDelay (getRunner(cleanLogs), 0 , CLEAN_INTERVAL_S , TimeUnit .SECONDS )
134134 }
135135 }
136136 }
@@ -204,11 +204,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
204204 mod1 >= mod2
205205 }
206206
207- logInfos.sliding(20 , 20 ).foreach { batch =>
208- replayExecutor.submit(new Runnable {
209- override def run (): Unit = mergeApplicationListing(batch)
210- })
211- }
207+ logInfos.grouped(20 )
208+ .map { batch =>
209+ replayExecutor.submit(new Runnable {
210+ override def run (): Unit = mergeApplicationListing(batch)
211+ })
212+ }
213+ .foreach { task =>
214+ try {
215+ // Wait for all tasks to finish. This makes sure that checkForLogs
216+ // is not scheduled again while some tasks are already running in
217+ // the replayExecutor.
218+ task.get()
219+ } catch {
220+ case e : InterruptedException =>
221+ throw e
222+ case e : Exception =>
223+ logError(" Exception while merging application listings" , e)
224+ }
225+ }
212226
213227 lastModifiedTime = newLastModifiedTime
214228 } catch {
0 commit comments