Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
taskScheduler.start()

val applicationId: String = taskScheduler.applicationId()
val applicationAttemptId: String = taskScheduler.applicationAttemptId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just kept it on similar lines as applicationId. An application with different attempt id, is similar to different application in many ways.

conf.set("spark.app.id", applicationId)

env.blockManager.initialize(applicationId)
Expand All @@ -385,8 +386,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
val logger = new EventLoggingListener(
applicationId, applicationAttemptId, eventLogDir.get, conf, hadoopConfiguration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indented too far

logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down Expand Up @@ -1735,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
startTime, sparkUser, applicationAttemptId))
}

/** Post the application end event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ private[spark] case class ApplicationHistoryInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)
completed: Boolean = false,
appAttemptId: String = "")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option[String]?


private[spark] abstract class ApplicationHistoryProvider {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
if (!logInfos.isEmpty) {
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo) = {
if (!newApps.contains(info.id) ||
newApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
val key =
if (info.appAttemptId.equals("")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Sean's suggestion of using Option[String]. That would make this check cleaner.

info.id
} else {
info.id + "_" + info.appAttemptId
}

if (!newApps.contains(key) ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like you should add parentheses here somewhere to clarify in which order the conditions should be parsed, since you're mixing $$ and ||.

newApps(key).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
!info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
newApps += (info.id -> info)
newApps += (key -> info)
}
}

Expand Down Expand Up @@ -309,7 +316,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
isApplicationCompleted(eventLog))
isApplicationCompleted(eventLog),
appListener.appAttemptId.getOrElse(""))
} finally {
logInput.close()
}
Expand Down Expand Up @@ -410,5 +418,7 @@ private class FsApplicationHistoryInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = true)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
completed: Boolean = true,
appAttemptId: String ="")
extends ApplicationHistoryInfo(
id, name, startTime, endTime, lastUpdated, sparkUser, completed, appAttemptId)
137 changes: 128 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.ui.{WebUIPage, UIUtils}
import scala.collection.immutable.ListMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: group with other scala.* imports

import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer

private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

Expand All @@ -34,18 +37,31 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean

val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))

val allCompletedAppsNAttempts =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AppsAndAttempts? No need to sacrifice readability for two characters.

parent.getApplicationList().filter(_.completed != requestedIncomplete)
val (hasAttemptInfo, appToAttemptMap) = getApplicationLevelList(allCompletedAppsNAttempts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about this code, so let me suggest something different. Why not turn the listing into this:

// Map application ID to a list of attempts
val appAttempts: LinkedHashMap[String, Seq[ApplicationHistoryInfo]] = ...

That should make it easier to handle apps with a single and multiple attemps with code that's mostly the same.

To avoid exploding memory usage, the code that does this translation (from Seq[ApplicationHistoryInfo] to the above map) should also take care of only looking at the app list for the current page being requested (instead of building a map for all applications and then slicing it).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW with the map you could do:

val hasAttemptInfo = appAttempts.exists { case (k, v) => v.size > 1 }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi,

If at a particular time, there are few event logs, which have attempt info, and there are few, which do no have. Then we display the same number of columns irrespective of the event log having attempt info or not.

To do above, we need just one flag of hasAttemptInfo for all the applications we have, and it is not per application. If we do that map logic, then we will need to iterate again on that map, after creating the same.

Regarding that memory exploding, if we first slice, and at a time ( in the case, when history server is restarting with some event logs already there ), then we might see one page's UI without attempt info, and another page with attempt UI. Is it fine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should slice on the applications, and then build the table with all the applications' attempts. So every page shows "x" applications, not attempts. After you apply the pagination logic, iterating is cheap ("x" is a small number), so calculating hasAttemptInfo separately is easy.

Perhaps a more efficient way of doing this would be to make ApplicationHistoryProvider.getListing return that map, then this code would be much simpler. It would just need to slice the map instead of transform a flat list into something different.

I just think the code you currently have is a little confusing, and a little inefficient. If you have a really long list of applications, it would waste a lot of cycles reading through it and copying things that would just be thrown away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make my suggestions a little more concrete, I'm suggesting something like this:

  • ApplicationHistoryInfo becomes ApplicationAttemptInfo

  • Add a new ApplicationHistoryInfo as follows:

    private[spark] class ApplicationHistoryInfo(id: String, attempts: Seq[ApplicationAttemptInfo])

  • getListing() remains unchanged, but note the data it returns is different now.

So now you have a list of applications being returned, and each application has information about all its attempts. Now it's much easier to slice that list and choose what to render in HistoryPage. It's a list, so it integrates nicely with UIUtils.listingTable, unlike the map I suggested before.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vanzin ,

Thanks for adding the concrete suggestion. I will incorporate the same.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vanzin ,

If we change to this data structure, then logic at getAppUI and getApplicationLevelList at FsHistoryProvider will get a bit more complicated.

Also, If there is any extension written for ApplicationHistoryInfo, that will also get impacted.

As per my understanding, what we are trying to achieve is get the list itself as a mapping from application id => List of Attempts.

I will prefer a utility method to get such mapped list, rather than modifying the structure.

Following is the basis for the same ( regarding complication of FsHistoryProvider only ):

  1. As we get the event logs of attempts one by one, logic to either create the new FsApplicationHistoryInfo or add to existing FsApplicationHistoryInfo object's attempt info list, needs to be incorporated.
  2. Order within the attempt info for an application, will require it's separate attention.
  3. Transition from In-Progress attempt to Complete will require another level of indirection.
  4. Inside getUI method also, we will require two level of indirections.

Please consider these points, as compared to the utility method to get the same while creating HistoryPage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, those methods will get a little more complicated. But without changing those, HistoryPage will have to implement pretty much the same transformation. I think doing that in HistoryPage is actually more complicated, and much less efficient.

If you build the better data structure in FsHistoryProvider, it's done once. If you do it in HistoryPage, it's done for every client request. So you need to be doubly careful about memory and cpu usage, because for large application lists, copying the whole thing would be very expensive.

The "level of indirection" you mention is a hashtable lookup, which is very cheap. That will get you the list of attemps for a particular application, and processing that list (which in the most common case will have a single entry) is rather easy.


val allAppsSize = allCompletedAppsNAttempts.size

val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0
val apps =
allCompletedAppsNAttempts.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize))
val appWithAttemptsDisplayList =
appToAttemptMap.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize))

val actualPage = (actualFirst / pageSize) + 1
val last = Math.min(actualFirst + pageSize, allApps.size) - 1
val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
val last = Math.min(actualFirst + pageSize, allAppsSize) - 1
val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0)

val secondPageFromLeft = 2
val secondPageFromRight = pageCount - 1

val appTable = UIUtils.listingTable(appHeader, appRow, apps)
val appTable =
if (hasAttemptInfo) {
UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appWithAttemptsDisplayList)
} else {
UIUtils.listingTable(appHeader, appRow, apps)
}
val providerConfig = parent.getProviderConfig()
val content =
<div class="row-fluid">
Expand All @@ -59,15 +75,15 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
// to the first and last page. If the current page +/- `plusOrMinus` is greater
// than the 2nd page from the first page or less than the 2nd page from the last
// page, `...` will be displayed.
if (allApps.size > 0) {
if (allAppsSize > 0) {
val leftSideIndices =
rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete)
val rightSideIndices =
rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount,
requestedIncomplete)

<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
Showing {actualFirst + 1}-{last + 1} of {allAppsSize}
{if (requestedIncomplete) "(Incomplete applications)"}
<span style="float: right">
{
Expand Down Expand Up @@ -113,6 +129,36 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
</div>
UIUtils.basicSparkPage(content, "History Server")
}

private def getApplicationLevelList (appNattemptList: Iterable[ApplicationHistoryInfo]) ={
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return type?

// Create HashMap as per the multiple attempts for one application.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like this should be in a scaladoc comment.

// If there is no attempt specific stuff, then
// do return false, to indicate the same, so that previous UI gets displayed.
var hasAttemptInfo = false
val appToAttemptInfo = new HashMap[String, ArrayBuffer[ApplicationHistoryInfo]]
for( appAttempt <- appNattemptList) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All throughout the code: for (...) {, if (...) {. Note the placement of the space.

if(!appAttempt.appAttemptId.equals("")){
hasAttemptInfo = true
val attemptId = appAttempt.appAttemptId.toInt
if(appToAttemptInfo.contains(appAttempt.id)){
val currentAttempts = appToAttemptInfo.get(appAttempt.id).get
currentAttempts += appAttempt
appToAttemptInfo.put( appAttempt.id, currentAttempts)
} else {
val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]()
currentAttempts += appAttempt
appToAttemptInfo.put( appAttempt.id, currentAttempts )
}
}else {
val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]()
currentAttempts += appAttempt
appToAttemptInfo.put(appAttempt.id, currentAttempts)
}
}
val sortedMap = ListMap(appToAttemptInfo.toSeq.sortWith(_._1 > _._1):_*)
(hasAttemptInfo, sortedMap)
}


private val appHeader = Seq(
"App ID",
Expand All @@ -128,6 +174,16 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
range.filter(condition).map(nextPage =>
<a href={makePageLink(nextPage, showIncomplete)}> {nextPage} </a>)
}

private val appWithAttemptHeader = Seq(
"App ID",
"App Name",
"Attempt ID",
"Started",
"Completed",
"Duration",
"Spark User",
"Last Updated")

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
Expand All @@ -146,6 +202,69 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}

private def getAttemptURI(attemptInfo: ApplicationHistoryInfo,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style:

private def foo(
    arg1: Type1,
    arg2: Type2): RetType {

returnEmptyIfAttemptInfoNull: Boolean = true ) = {
if (attemptInfo.appAttemptId.equals("")) {
if(returnEmptyIfAttemptInfoNull) {
attemptInfo.appAttemptId
} else {
HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}"
}
} else {
HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}"
}
}

private def firstAttemptRow(attemptInfo : ApplicationHistoryInfo) = {
val uiAddress =
if (attemptInfo.appAttemptId.equals("")) {
attemptInfo.appAttemptId
} else {
HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}"
}

val startTime = UIUtils.formatDate(attemptInfo.startTime)
val endTime = UIUtils.formatDate(attemptInfo.endTime)
val duration = UIUtils.formatDuration(attemptInfo.endTime - attemptInfo.startTime)
val lastUpdated = UIUtils.formatDate(attemptInfo.lastUpdated)
val attemptId = attemptInfo.appAttemptId
<td><a href={uiAddress}>{attemptId}</a></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does scala actually consider this line as a continuation of the previous line? The inline XML syntax has always looked super weird to me...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it do so. [ For case, even I took some time to figure out the same ]

<td sorttable_customkey={attemptInfo.startTime.toString}>{startTime}</td>
<td sorttable_customkey={attemptInfo.endTime.toString}>{endTime}</td>
<td sorttable_customkey={(attemptInfo.endTime - attemptInfo.startTime).toString}>
{duration}</td>
<td>{attemptInfo.sparkUser}</td>
<td sorttable_customkey={attemptInfo.lastUpdated.toString}>{lastUpdated}</td>
}

private def attemptRow(attemptInfo: ApplicationHistoryInfo) = {
<tr>
{firstAttemptRow(attemptInfo)}
</tr>
}

private def appWithAttemptRow(
appAttemptsInfo: (String,ArrayBuffer[ApplicationHistoryInfo])): Seq[Node] = {
val applicationId = appAttemptsInfo._1
val info = appAttemptsInfo._2
val rowSpan = info.length
val rowSpanString = rowSpan.toString
val applicatioName = info(0).name
val lastAttemptURI = getAttemptURI(info(0), false)
val ttAttempts = info.slice(1, rowSpan -1)
val x = new xml.NodeBuffer
x +=
<tr>
<td rowspan={rowSpanString}><a href={lastAttemptURI}>{applicationId}</a></td>
<td rowspan={rowSpanString}>{applicatioName}</td>
{ firstAttemptRow(info(0)) }
</tr>;
for( i <- 1 until rowSpan ){
x += attemptRow(info(i))
}
x
}

private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
"/?" + Array(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package org.apache.spark.scheduler
private[spark] class ApplicationEventListener extends SparkListener {
var appName: Option[String] = None
var appId: Option[String] = None
var appAttemptId: Option[String] = None
var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
Expand All @@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
appId = applicationStart.appId
appAttemptId = Some(applicationStart.appAttemptId)
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,17 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
appAttemptId : String,
logBaseDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appId: String, logBaseDir: String, sparkConf: SparkConf, hadoopConf: Configuration) =
this(appId, "", logBaseDir, sparkConf, hadoopConf)

def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

Expand All @@ -80,7 +84,7 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]

// Visible for tests only.
private[scheduler] val logPath = getLogPath(logBaseDir, appId)
private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId)

/**
* Creates the log file in the configured log directory.
Expand Down Expand Up @@ -262,8 +266,26 @@ private[spark] object EventLoggingListener extends Logging {
* @return A path which consists of file-system-safe characters.
*/
def getLogPath(logBaseDir: String, appId: String): String = {
getLogPath(logBaseDir, appId, "")
}

/**
* Return a file-system-safe path to the log directory for the given application.
*
* @param logBaseDir A base directory for the path to the log directory for given application.
* @param appId A unique app ID.
* @param appAttemptId A unique attempt id of appId.
* @return A path which consists of file-system-safe characters.
*/

def getLogPath(logBaseDir: String, appId: String, appAttemptId : String): String = {
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")

if (appAttemptId.equals("")) {
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
} else {
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + "_" + appAttemptId
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ private[spark] trait SchedulerBackend {
*/
def applicationId(): String = appId

/**
* Get an application ID associated with the job.
*
* @return An application attempt id
*/
def applicationAttemptId(): String = ""

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
sparkUser: String) extends SparkListenerEvent
case class SparkListenerApplicationStart(appName: String, appId: Option[String],
time: Long, sparkUser: String, appAttemptId: String = "") extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,12 @@ private[spark] trait TaskScheduler {
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit

/**
* Get an application's attempt Id associated with the job.
*
* @return An application's Attempt ID
*/
def applicationAttemptId(): String = ""

}
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,8 @@ private[spark] class TaskSchedulerImpl(
}

override def applicationId(): String = backend.applicationId()

override def applicationAttemptId(): String = backend.applicationAttemptId()

}

Expand Down
Loading