1818package org .apache .spark .deploy .history
1919
2020import java .io .{File , FileNotFoundException , IOException }
21- import java .util .{Date , ServiceLoader , UUID }
21+ import java .util .{Date , ServiceLoader }
2222import java .util .concurrent .{ExecutorService , TimeUnit }
2323import java .util .zip .{ZipEntry , ZipOutputStream }
2424
2525import scala .collection .JavaConverters ._
2626import scala .collection .mutable
27+ import scala .io .Source
2728import scala .util .Try
2829import scala .xml .Node
2930
@@ -125,6 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
125126 private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger (0 )
126127
127128 private val storePath = conf.get(LOCAL_STORE_DIR ).map(new File (_))
129+ private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING )
128130
129131 // Visible for testing.
130132 private [history] val listing : KVStore = storePath.map { path =>
@@ -402,13 +404,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
402404 */
403405 private [history] def checkForLogs (): Unit = {
404406 try {
405- val newLastScanTime = getNewLastScanTime ()
407+ val newLastScanTime = clock.getTimeMillis ()
406408 logDebug(s " Scanning $logDir with lastScanTime== $lastScanTime" )
407409
408410 val updated = Option (fs.listStatus(new Path (logDir))).map(_.toSeq).getOrElse(Nil )
409411 .filter { entry =>
410412 ! entry.isDirectory() &&
411- // FsHistoryProvider generates a hidden file which can't be read. Accidentally
413+ // FsHistoryProvider used to generate a hidden file which can't be read. Accidentally
412414 // reading a garbage file is safe, but we would log an error which can be scary to
413415 // the end-user.
414416 ! entry.getPath().getName().startsWith(" ." ) &&
@@ -417,15 +419,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
417419 .filter { entry =>
418420 try {
419421 val info = listing.read(classOf [LogInfo ], entry.getPath().toString())
420- if (info.fileSize < entry.getLen()) {
421- // Log size has changed, it should be parsed.
422- true
423- } else {
422+
423+ if (info.appId.isDefined) {
424424 // If the SHS view has a valid application, update the time the file was last seen so
425425 // that the entry is not deleted from the SHS listing.
426- if (info.appId.isDefined) {
427- listing.write(info.copy(lastProcessed = newLastScanTime))
426+ listing.write(info.copy(lastProcessed = newLastScanTime))
427+ }
428+
429+ if (info.fileSize < entry.getLen()) {
430+ if (info.appId.isDefined && fastInProgressParsing) {
431+ // When fast in-progress parsing is on, we don't need to re-parse when the
432+ // size changes, but we do need to invalidate any existing UIs.
433+ invalidateUI(info.appId.get, info.attemptId)
434+ false
435+ } else {
436+ true
428437 }
438+ } else {
429439 false
430440 }
431441 } catch {
@@ -449,7 +459,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
449459 val tasks = updated.map { entry =>
450460 try {
451461 replayExecutor.submit(new Runnable {
452- override def run (): Unit = mergeApplicationListing(entry, newLastScanTime)
462+ override def run (): Unit = mergeApplicationListing(entry, newLastScanTime, true )
453463 })
454464 } catch {
455465 // let the iteration over the updated entries break, since an exception on
@@ -542,25 +552,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
542552 }
543553 }
544554
545- private [history] def getNewLastScanTime (): Long = {
546- val fileName = " ." + UUID .randomUUID().toString
547- val path = new Path (logDir, fileName)
548- val fos = fs.create(path)
549-
550- try {
551- fos.close()
552- fs.getFileStatus(path).getModificationTime
553- } catch {
554- case e : Exception =>
555- logError(" Exception encountered when attempting to update last scan time" , e)
556- lastScanTime.get()
557- } finally {
558- if (! fs.delete(path, true )) {
559- logWarning(s " Error deleting ${path}" )
560- }
561- }
562- }
563-
564555 override def writeEventLogs (
565556 appId : String ,
566557 attemptId : Option [String ],
@@ -607,7 +598,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
607598 /**
608599 * Replay the given log file, saving the application in the listing db.
609600 */
610- protected def mergeApplicationListing (fileStatus : FileStatus , scanTime : Long ): Unit = {
601+ protected def mergeApplicationListing (
602+ fileStatus : FileStatus ,
603+ scanTime : Long ,
604+ enableOptimizations : Boolean ): Unit = {
611605 val eventsFilter : ReplayEventsFilter = { eventString =>
612606 eventString.startsWith(APPL_START_EVENT_PREFIX ) ||
613607 eventString.startsWith(APPL_END_EVENT_PREFIX ) ||
@@ -616,32 +610,118 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
616610 }
617611
618612 val logPath = fileStatus.getPath()
613+ val appCompleted = isCompleted(logPath.getName())
614+ val reparseChunkSize = conf.get(END_EVENT_REPARSE_CHUNK_SIZE )
615+
616+ // Enable halt support in listener if:
617+ // - app in progress && fast parsing enabled
618+ // - skipping to end event is enabled (regardless of in-progress state)
619+ val shouldHalt = enableOptimizations &&
620+ ((! appCompleted && fastInProgressParsing) || reparseChunkSize > 0 )
621+
619622 val bus = new ReplayListenerBus ()
620- val listener = new AppListingListener (fileStatus, clock)
623+ val listener = new AppListingListener (fileStatus, clock, shouldHalt )
621624 bus.addListener(listener)
622- replay(fileStatus, bus, eventsFilter = eventsFilter)
623-
624- val (appId, attemptId) = listener.applicationInfo match {
625- case Some (app) =>
626- // Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
627- // discussion on the UI lifecycle.
628- synchronized {
629- activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui =>
630- ui.invalidate()
631- ui.ui.store.close()
625+
626+ logInfo(s " Parsing $logPath for listing data... " )
627+ Utils .tryWithResource(EventLoggingListener .openEventLog(logPath, fs)) { in =>
628+ bus.replay(in, logPath.toString, ! appCompleted, eventsFilter)
629+ }
630+
631+ // If enabled above, the listing listener will halt parsing when there's enough information to
632+ // create a listing entry. When the app is completed, or fast parsing is disabled, we still need
633+ // to replay until the end of the log file to try to find the app end event. Instead of reading
634+ // and parsing line by line, this code skips bytes from the underlying stream so that it is
635+ // positioned somewhere close to the end of the log file.
636+ //
637+ // Because the application end event is written while some Spark subsystems such as the
638+ // scheduler are still active, there is no guarantee that the end event will be the last
639+ // in the log. So, to be safe, the code uses a configurable chunk to be re-parsed at
640+ // the end of the file, and retries parsing the whole log later if the needed data is
641+ // still not found.
642+ //
643+ // Note that skipping bytes in compressed files is still not cheap, but there are still some
644+ // minor gains over the normal log parsing done by the replay bus.
645+ //
646+ // This code re-opens the file so that it knows where it's skipping to. This isn't as cheap as
647+ // just skipping from the current position, but there isn't a a good way to detect what the
648+ // current position is, since the replay listener bus buffers data internally.
649+ val lookForEndEvent = shouldHalt && (appCompleted || ! fastInProgressParsing)
650+ if (lookForEndEvent && listener.applicationInfo.isDefined) {
651+ Utils .tryWithResource(EventLoggingListener .openEventLog(logPath, fs)) { in =>
652+ val target = fileStatus.getLen() - reparseChunkSize
653+ if (target > 0 ) {
654+ logInfo(s " Looking for end event; skipping $target bytes from $logPath... " )
655+ var skipped = 0L
656+ while (skipped < target) {
657+ skipped += in.skip(target - skipped)
632658 }
633659 }
634660
661+ val source = Source .fromInputStream(in).getLines()
662+
663+ // Because skipping may leave the stream in the middle of a line, read the next line
664+ // before replaying.
665+ if (target > 0 ) {
666+ source.next()
667+ }
668+
669+ bus.replay(source, logPath.toString, ! appCompleted, eventsFilter)
670+ }
671+ }
672+
673+ logInfo(s " Finished parsing $logPath" )
674+
675+ listener.applicationInfo match {
676+ case Some (app) if ! lookForEndEvent || app.attempts.head.info.completed =>
677+ // In this case, we either didn't care about the end event, or we found it. So the
678+ // listing data is good.
679+ invalidateUI(app.info.id, app.attempts.head.info.attemptId)
635680 addListing(app)
636- (Some (app.info.id), app.attempts.head.info.attemptId)
681+ listing.write(LogInfo (logPath.toString(), scanTime, Some (app.info.id),
682+ app.attempts.head.info.attemptId, fileStatus.getLen()))
683+
684+ // For a finished log, remove the corresponding "in progress" entry from the listing DB if
685+ // the file is really gone.
686+ if (appCompleted) {
687+ val inProgressLog = logPath.toString() + EventLoggingListener .IN_PROGRESS
688+ try {
689+ // Fetch the entry first to avoid an RPC when it's already removed.
690+ listing.read(classOf [LogInfo ], inProgressLog)
691+ if (! fs.isFile(new Path (inProgressLog))) {
692+ listing.delete(classOf [LogInfo ], inProgressLog)
693+ }
694+ } catch {
695+ case _ : NoSuchElementException =>
696+ }
697+ }
698+
699+ case Some (_) =>
700+ // In this case, the attempt is still not marked as finished but was expected to. This can
701+ // mean the end event is before the configured threshold, so call the method again to
702+ // re-parse the whole log.
703+ logInfo(s " Reparsing $logPath since end event was not found. " )
704+ mergeApplicationListing(fileStatus, scanTime, false )
637705
638706 case _ =>
639707 // If the app hasn't written down its app ID to the logs, still record the entry in the
640708 // listing db, with an empty ID. This will make the log eligible for deletion if the app
641709 // does not make progress after the configured max log age.
642- (None , None )
710+ listing.write(LogInfo (logPath.toString(), scanTime, None , None , fileStatus.getLen()))
711+ }
712+ }
713+
714+ /**
715+ * Invalidate an existing UI for a given app attempt. See LoadedAppUI for a discussion on the
716+ * UI lifecycle.
717+ */
718+ private def invalidateUI (appId : String , attemptId : Option [String ]): Unit = {
719+ synchronized {
720+ activeUIs.get((appId, attemptId)).foreach { ui =>
721+ ui.invalidate()
722+ ui.ui.store.close()
723+ }
643724 }
644- listing.write(LogInfo (logPath.toString(), scanTime, appId, attemptId, fileStatus.getLen()))
645725 }
646726
647727 /**
@@ -696,29 +776,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
696776 }
697777 }
698778
699- /**
700- * Replays the events in the specified log file on the supplied `ReplayListenerBus`.
701- * `ReplayEventsFilter` determines what events are replayed.
702- */
703- private def replay (
704- eventLog : FileStatus ,
705- bus : ReplayListenerBus ,
706- eventsFilter : ReplayEventsFilter = SELECT_ALL_FILTER ): Unit = {
707- val logPath = eventLog.getPath()
708- val isCompleted = ! logPath.getName().endsWith(EventLoggingListener .IN_PROGRESS )
709- logInfo(s " Replaying log path: $logPath" )
710- // Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
711- // and when we read the file here. That is OK -- it may result in an unnecessary refresh
712- // when there is no update, but will not result in missing an update. We *must* prevent
713- // an error the other way -- if we report a size bigger (ie later) than the file that is
714- // actually read, we may never refresh the app. FileStatus is guaranteed to be static
715- // after it's created, so we get a file size that is no bigger than what is actually read.
716- Utils .tryWithResource(EventLoggingListener .openEventLog(logPath, fs)) { in =>
717- bus.replay(in, logPath.toString, ! isCompleted, eventsFilter)
718- logInfo(s " Finished parsing $logPath" )
719- }
720- }
721-
722779 /**
723780 * Rebuilds the application state store from its event log.
724781 */
@@ -741,8 +798,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
741798 } replayBus.addListener(listener)
742799
743800 try {
744- replay(eventLog, replayBus)
801+ val path = eventLog.getPath()
802+ logInfo(s " Parsing $path to re-build UI... " )
803+ Utils .tryWithResource(EventLoggingListener .openEventLog(path, fs)) { in =>
804+ replayBus.replay(in, path.toString(), maybeTruncated = ! isCompleted(path.toString()))
805+ }
745806 trackingStore.close(false )
807+ logInfo(s " Finished parsing $path" )
746808 } catch {
747809 case e : Exception =>
748810 Utils .tryLogNonFatalError {
@@ -881,6 +943,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
881943 }
882944 }
883945
946+ private def isCompleted (name : String ): Boolean = {
947+ ! name.endsWith(EventLoggingListener .IN_PROGRESS )
948+ }
949+
884950}
885951
886952private [history] object FsHistoryProvider {
@@ -945,11 +1011,17 @@ private[history] class ApplicationInfoWrapper(
9451011
9461012}
9471013
948- private [history] class AppListingListener (log : FileStatus , clock : Clock ) extends SparkListener {
1014+ private [history] class AppListingListener (
1015+ log : FileStatus ,
1016+ clock : Clock ,
1017+ haltEnabled : Boolean ) extends SparkListener {
9491018
9501019 private val app = new MutableApplicationInfo ()
9511020 private val attempt = new MutableAttemptInfo (log.getPath().getName(), log.getLen())
9521021
1022+ private var gotEnvUpdate = false
1023+ private var halted = false
1024+
9531025 override def onApplicationStart (event : SparkListenerApplicationStart ): Unit = {
9541026 app.id = event.appId.orNull
9551027 app.name = event.appName
@@ -958,6 +1030,8 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
9581030 attempt.startTime = new Date (event.time)
9591031 attempt.lastUpdated = new Date (clock.getTimeMillis())
9601032 attempt.sparkUser = event.sparkUser
1033+
1034+ checkProgress()
9611035 }
9621036
9631037 override def onApplicationEnd (event : SparkListenerApplicationEnd ): Unit = {
@@ -968,11 +1042,18 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
9681042 }
9691043
9701044 override def onEnvironmentUpdate (event : SparkListenerEnvironmentUpdate ): Unit = {
971- val allProperties = event.environmentDetails(" Spark Properties" ).toMap
972- attempt.viewAcls = allProperties.get(" spark.ui.view.acls" )
973- attempt.adminAcls = allProperties.get(" spark.admin.acls" )
974- attempt.viewAclsGroups = allProperties.get(" spark.ui.view.acls.groups" )
975- attempt.adminAclsGroups = allProperties.get(" spark.admin.acls.groups" )
1045+ // Only parse the first env update, since any future changes don't have any effect on
1046+ // the ACLs set for the UI.
1047+ if (! gotEnvUpdate) {
1048+ val allProperties = event.environmentDetails(" Spark Properties" ).toMap
1049+ attempt.viewAcls = allProperties.get(" spark.ui.view.acls" )
1050+ attempt.adminAcls = allProperties.get(" spark.admin.acls" )
1051+ attempt.viewAclsGroups = allProperties.get(" spark.ui.view.acls.groups" )
1052+ attempt.adminAclsGroups = allProperties.get(" spark.admin.acls.groups" )
1053+
1054+ gotEnvUpdate = true
1055+ checkProgress()
1056+ }
9761057 }
9771058
9781059 override def onOtherEvent (event : SparkListenerEvent ): Unit = event match {
@@ -989,6 +1070,17 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
9891070 }
9901071 }
9911072
1073+ /**
1074+ * Throws a halt exception to stop replay if enough data to create the app listing has been
1075+ * read.
1076+ */
1077+ private def checkProgress (): Unit = {
1078+ if (haltEnabled && ! halted && app.id != null && gotEnvUpdate) {
1079+ halted = true
1080+ throw new HaltReplayException ()
1081+ }
1082+ }
1083+
9921084 private class MutableApplicationInfo {
9931085 var id : String = null
9941086 var name : String = null
0 commit comments