@@ -19,19 +19,19 @@ package org.apache.spark.deploy.history
1919
2020import java .io .{File , FileNotFoundException , IOException }
2121import java .util .{Date , ServiceLoader , UUID }
22- import java .util .concurrent .{ExecutorService , TimeUnit }
22+ import java .util .concurrent .{ConcurrentHashMap , ExecutorService , Future , TimeUnit }
2323import java .util .zip .{ZipEntry , ZipOutputStream }
2424
2525import scala .collection .JavaConverters ._
2626import scala .collection .mutable
27+ import scala .concurrent .ExecutionException
2728import scala .util .Try
2829import scala .xml .Node
2930
3031import com .fasterxml .jackson .annotation .JsonIgnore
3132import com .google .common .io .ByteStreams
3233import com .google .common .util .concurrent .MoreExecutors
33- import org .apache .hadoop .fs .{FileStatus , Path }
34- import org .apache .hadoop .fs .permission .FsAction
34+ import org .apache .hadoop .fs .{FileStatus , FileSystem , Path }
3535import org .apache .hadoop .hdfs .DistributedFileSystem
3636import org .apache .hadoop .hdfs .protocol .HdfsConstants
3737import org .apache .hadoop .security .AccessControlException
@@ -111,7 +111,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
111111 " ; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS .toString)
112112
113113 private val hadoopConf = SparkHadoopUtil .get.newConfiguration(conf)
114- private val fs = new Path (logDir).getFileSystem(hadoopConf)
114+ // Visible for testing
115+ private [history] val fs : FileSystem = new Path (logDir).getFileSystem(hadoopConf)
115116
116117 // Used by check event thread and clean log thread.
117118 // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs
@@ -155,6 +156,25 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
155156 new HistoryServerDiskManager (conf, path, listing, clock)
156157 }
157158
159+ private val blacklist = new ConcurrentHashMap [String , Long ]
160+
161+ // Visible for testing
162+ private [history] def isBlacklisted (path : Path ): Boolean = {
163+ blacklist.containsKey(path.getName)
164+ }
165+
166+ private def blacklist (path : Path ): Unit = {
167+ blacklist.put(path.getName, clock.getTimeMillis())
168+ }
169+
170+ /**
171+ * Removes expired entries in the blacklist, according to the provided `expireTimeInSeconds`.
172+ */
173+ private def clearBlacklist (expireTimeInSeconds : Long ): Unit = {
174+ val expiredThreshold = clock.getTimeMillis() - expireTimeInSeconds * 1000
175+ blacklist.asScala.retain((_, creationTime) => creationTime >= expiredThreshold)
176+ }
177+
158178 private val activeUIs = new mutable.HashMap [(String , Option [String ]), LoadedAppUI ]()
159179
160180 /**
@@ -412,7 +432,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
412432 // reading a garbage file is safe, but we would log an error which can be scary to
413433 // the end-user.
414434 ! entry.getPath().getName().startsWith(" ." ) &&
415- SparkHadoopUtil .get.checkAccessPermission (entry, FsAction . READ )
435+ ! isBlacklisted (entry.getPath )
416436 }
417437 .filter { entry =>
418438 try {
@@ -446,32 +466,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
446466 logDebug(s " New/updated attempts found: ${updated.size} ${updated.map(_.getPath)}" )
447467 }
448468
449- val tasks = updated.map { entry =>
469+ val tasks = updated.flatMap { entry =>
450470 try {
451- replayExecutor.submit(new Runnable {
471+ val task : Future [ Unit ] = replayExecutor.submit(new Runnable {
452472 override def run (): Unit = mergeApplicationListing(entry, newLastScanTime)
453- })
473+ }, Unit )
474+ Some (task -> entry.getPath)
454475 } catch {
455476 // let the iteration over the updated entries break, since an exception on
456477 // replayExecutor.submit (..) indicates the ExecutorService is unable
457478 // to take any more submissions at this time
458479 case e : Exception =>
459480 logError(s " Exception while submitting event log for replay " , e)
460- null
481+ None
461482 }
462- }.filter(_ != null )
483+ }
463484
464485 pendingReplayTasksCount.addAndGet(tasks.size)
465486
466487 // Wait for all tasks to finish. This makes sure that checkForLogs
467488 // is not scheduled again while some tasks are already running in
468489 // the replayExecutor.
469- tasks.foreach { task =>
490+ tasks.foreach { case ( task, path) =>
470491 try {
471492 task.get()
472493 } catch {
473494 case e : InterruptedException =>
474495 throw e
496+ case e : ExecutionException if e.getCause.isInstanceOf [AccessControlException ] =>
497+ // We don't have read permissions on the log file
498+ logWarning(s " Unable to read log $path" , e.getCause)
499+ blacklist(path)
475500 case e : Exception =>
476501 logError(" Exception while merging application listings" , e)
477502 } finally {
@@ -694,6 +719,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
694719 listing.delete(classOf [LogInfo ], log.logPath)
695720 }
696721 }
722+ // Clean the blacklist from the expired entries.
723+ clearBlacklist(CLEAN_INTERVAL_S )
697724 }
698725
699726 /**
@@ -871,13 +898,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
871898 }
872899
873900 private def deleteLog (log : Path ): Unit = {
874- try {
875- fs.delete(log, true )
876- } catch {
877- case _ : AccessControlException =>
878- logInfo(s " No permission to delete $log, ignoring. " )
879- case ioe : IOException =>
880- logError(s " IOException in cleaning $log" , ioe)
901+ if (isBlacklisted(log)) {
902+ logDebug(s " Skipping deleting $log as we don't have permissions on it. " )
903+ } else {
904+ try {
905+ fs.delete(log, true )
906+ } catch {
907+ case _ : AccessControlException =>
908+ logInfo(s " No permission to delete $log, ignoring. " )
909+ case ioe : IOException =>
910+ logError(s " IOException in cleaning $log" , ioe)
911+ }
881912 }
882913 }
883914
0 commit comments