Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val app = try {
load(appId)
} catch {
case _: NoSuchElementException if this.conf.get(ON_DEMAND_ENABLED) =>
val name = Utils.nameForAppAndAttempt(appId, attemptId)
loadFromFallbackLocation(appId, attemptId,
RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + name)
Copy link
Contributor

@mridulm mridulm Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not assume it will be RollingEventLogFilesWriter, users dont need to be running with the default enabled, right ?

case _: NoSuchElementException =>
return None
}
Expand Down Expand Up @@ -364,6 +368,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
Some(loadedUI)
}

private def loadFromFallbackLocation(appId: String, attemptId: Option[String], logPath: String)
: ApplicationInfoWrapper = {
val date = new Date(0)
val info = ApplicationAttemptInfo(attemptId, date, date, date, 0, "spark", false, "unknown")
addListing(new ApplicationInfoWrapper(
ApplicationInfo(appId, appId, None, None, None, None, List.empty),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So supposedly, the appId should be correct to load the record, but other info are dummy?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And once periodic scanning happens, it will update the record with correct information?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct, @viirya ~

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a kind of placeholder.

List(new AttemptInfoWrapper(info, logPath, 0, Some(1), None, None, None, None))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we rely on the event log for information like startTime, endTime, user etc ? Will this not lead to incorrect information being displayed on the home page of SHS ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only a dummy place to allow SHS shows the application logs before periodic scanning happens. The periodic scanning will keep it in sync.

BTW, I'm wondering how many times do you think this fallback is used in the production environments, @thejdeep ? I'm curious if you are thinking about turning off the periodic scanning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see that the intention is just to have dummy placeholders until the scanning takes care of it.

If users operate with a large Spark cluster, my two cents are that users may tend to access their app on demand much more frequently and it might just lead to a incorrect listing page. For example, we noticed that a good fraction of our SHS requests are on demand since users would like to get their reports as soon as their app finishes and before checkForLogs completes.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and technically, it's not exposed in the listing page. Could you build this PR and test it by yourself?

a incorrect listing page

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like a limitation of a single file event log, @thejdeep . If you have rolling event logs, SHS have the correct partial information already while your jobs are running.

For example, we noticed that a good fraction of our SHS requests are on demand since users would like to get their reports as soon as their app finishes and before checkForLogs completes.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just questions to understand your use cases:

  • How do you handle Spark Streaming Jobs with a single file event log ? Still your job doesn't use rolling event logs?
  • Are you assuming only Spark 2.x or 3.x jobs because Spark 4 jobs generates rolling events by default since SPARK-45771?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for sharing context @dongjoon-hyun .

We currently do not use rolling event logs since we only currently serve batch use-cases. All applications are currently on 3.x.

I can build your PR locally and test it on single file event logs to see how it works with listing and cleanup. I can get back to you earliest by tomorrow if that works.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for the info and your efforts on reviewing this. Take your time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun wanted to get your thoughts on #51604 (comment)

Thank you!

load(appId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior if the application does not exist ? (typo in user query for example)
Will the listing now have an invalid entry ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, do you think it will be better if we check for the existence of the file at its location before adding an entry ? This is to keep parity with how checkForLogs works - we only add entries for whose event log locations exist.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we had better avoid that because it requires the full path including "s3://...", @thejdeep .

}

override def getEmptyListingHtml(): Seq[Node] = {
<p>
Did you specify the correct logging directory? Please verify your setting of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ private[spark] object History {
.checkValue(v => v > 0, "The update batchSize should be a positive integer.")
.createWithDefault(Int.MaxValue)

val ON_DEMAND_ENABLED = ConfigBuilder("spark.history.fs.update.onDemandEnabled")
.version("4.1.0")
.doc("Whether to look up rolling event log locations on demand manner before listing files.")
.booleanConf
.createWithDefault(true)

val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
.version("1.4.0")
.doc("Whether the History Server should periodically clean up event logs from storage")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,31 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
}
}

test("SPARK-52914: Support spark.history.fs.update.onDemandEnabled") {
Seq(true, false).foreach { onDemandEnabled =>
withTempDir { dir =>
val conf = createTestConf(true)
conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
conf.set(ON_DEMAND_ENABLED, onDemandEnabled)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
val provider = new FsHistoryProvider(conf)

val writer1 = new RollingEventLogFilesWriter("app1", None, dir.toURI, conf, hadoopConf)
writer1.start()
writeEventsToRollingWriter(writer1, Seq(
SparkListenerApplicationStart("app1", Some("app1"), 0, "user", None),
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
writer1.stop()

assert(provider.getListing().length === 0)
assert(dir.listFiles().length === 1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the test coverage, @thejdeep .

assert(provider.getAppUI("app1", None).isDefined == onDemandEnabled)

provider.stop()
}
}
}

test("SPARK-36354: EventLogFileReader should skip rolling event log directories with no logs") {
withTempDir { dir =>
val conf = createTestConf(true)
Expand Down