diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3cd0c218a36fd..e383dc1576817 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -370,6 +370,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
taskScheduler.start()
val applicationId: String = taskScheduler.applicationId()
+ val applicationAttemptId: String = taskScheduler.applicationAttemptId()
conf.set("spark.app.id", applicationId)
env.blockManager.initialize(applicationId)
@@ -385,8 +386,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (isEventLogEnabled) {
- val logger =
- new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
+ val logger = new EventLoggingListener(
+ applicationId, applicationAttemptId, eventLogDir.get, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
@@ -1735,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
- startTime, sparkUser))
+ startTime, sparkUser, applicationAttemptId))
}
/** Post the application end event */
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index 553bf3cb945ab..e78eefe5686ea 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -26,7 +26,8 @@ private[spark] case class ApplicationHistoryInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
- completed: Boolean = false)
+ completed: Boolean = false,
+ appAttemptId: String = "")
private[spark] abstract class ApplicationHistoryProvider {
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 3e3d6ff29faf0..2889d7a01a98d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -208,10 +208,17 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
if (!logInfos.isEmpty) {
val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo) = {
- if (!newApps.contains(info.id) ||
- newApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
+ val key =
+ if (info.appAttemptId.equals("")) {
+ info.id
+ } else {
+ info.id + "_" + info.appAttemptId
+ }
+
+ if (!newApps.contains(key) ||
+ newApps(key).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
!info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
- newApps += (info.id -> info)
+ newApps += (key -> info)
}
}
@@ -309,7 +316,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
- isApplicationCompleted(eventLog))
+ isApplicationCompleted(eventLog),
+ appListener.appAttemptId.getOrElse(""))
} finally {
logInput.close()
}
@@ -410,5 +418,7 @@ private class FsApplicationHistoryInfo(
endTime: Long,
lastUpdated: Long,
sparkUser: String,
- completed: Boolean = true)
- extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
+ completed: Boolean = true,
+ appAttemptId: String ="")
+ extends ApplicationHistoryInfo(
+ id, name, startTime, endTime, lastUpdated, sparkUser, completed, appAttemptId)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 26ebc75971c66..9c4d9e64bb127 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -22,6 +22,9 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.ui.{WebUIPage, UIUtils}
+import scala.collection.immutable.ListMap
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.ArrayBuffer
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
@@ -34,18 +37,31 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val requestedIncomplete =
Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
- val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
- val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
- val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
-
+ val allCompletedAppsNAttempts =
+ parent.getApplicationList().filter(_.completed != requestedIncomplete)
+ val (hasAttemptInfo, appToAttemptMap) = getApplicationLevelList(allCompletedAppsNAttempts)
+
+ val allAppsSize = allCompletedAppsNAttempts.size
+
+ val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0
+ val apps =
+ allCompletedAppsNAttempts.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize))
+ val appWithAttemptsDisplayList =
+ appToAttemptMap.slice(actualFirst, Math.min(actualFirst + pageSize, allAppsSize))
+
val actualPage = (actualFirst / pageSize) + 1
- val last = Math.min(actualFirst + pageSize, allApps.size) - 1
- val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
+ val last = Math.min(actualFirst + pageSize, allAppsSize) - 1
+ val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0)
val secondPageFromLeft = 2
val secondPageFromRight = pageCount - 1
- val appTable = UIUtils.listingTable(appHeader, appRow, apps)
+ val appTable =
+ if (hasAttemptInfo) {
+ UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appWithAttemptsDisplayList)
+ } else {
+ UIUtils.listingTable(appHeader, appRow, apps)
+ }
val providerConfig = parent.getProviderConfig()
val content =
@@ -59,7 +75,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
// to the first and last page. If the current page +/- `plusOrMinus` is greater
// than the 2nd page from the first page or less than the 2nd page from the last
// page, `...` will be displayed.
- if (allApps.size > 0) {
+ if (allAppsSize > 0) {
val leftSideIndices =
rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete)
val rightSideIndices =
@@ -67,7 +83,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
requestedIncomplete)
- Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+ Showing {actualFirst + 1}-{last + 1} of {allAppsSize}
{if (requestedIncomplete) "(Incomplete applications)"}
{
@@ -113,6 +129,36 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
UIUtils.basicSparkPage(content, "History Server")
}
+
+ private def getApplicationLevelList (appNattemptList: Iterable[ApplicationHistoryInfo]) ={
+ // Create HashMap as per the multiple attempts for one application.
+ // If there is no attempt specific stuff, then
+ // do return false, to indicate the same, so that previous UI gets displayed.
+ var hasAttemptInfo = false
+ val appToAttemptInfo = new HashMap[String, ArrayBuffer[ApplicationHistoryInfo]]
+ for( appAttempt <- appNattemptList) {
+ if(!appAttempt.appAttemptId.equals("")){
+ hasAttemptInfo = true
+ val attemptId = appAttempt.appAttemptId.toInt
+ if(appToAttemptInfo.contains(appAttempt.id)){
+ val currentAttempts = appToAttemptInfo.get(appAttempt.id).get
+ currentAttempts += appAttempt
+ appToAttemptInfo.put( appAttempt.id, currentAttempts)
+ } else {
+ val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]()
+ currentAttempts += appAttempt
+ appToAttemptInfo.put( appAttempt.id, currentAttempts )
+ }
+ }else {
+ val currentAttempts = new ArrayBuffer[ApplicationHistoryInfo]()
+ currentAttempts += appAttempt
+ appToAttemptInfo.put(appAttempt.id, currentAttempts)
+ }
+ }
+ val sortedMap = ListMap(appToAttemptInfo.toSeq.sortWith(_._1 > _._1):_*)
+ (hasAttemptInfo, sortedMap)
+ }
+
private val appHeader = Seq(
"App ID",
@@ -128,6 +174,16 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
range.filter(condition).map(nextPage =>
{nextPage} )
}
+
+ private val appWithAttemptHeader = Seq(
+ "App ID",
+ "App Name",
+ "Attempt ID",
+ "Started",
+ "Completed",
+ "Duration",
+ "Spark User",
+ "Last Updated")
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
@@ -146,6 +202,69 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
{lastUpdated} |
}
+
+ private def getAttemptURI(attemptInfo: ApplicationHistoryInfo,
+ returnEmptyIfAttemptInfoNull: Boolean = true ) = {
+ if (attemptInfo.appAttemptId.equals("")) {
+ if(returnEmptyIfAttemptInfoNull) {
+ attemptInfo.appAttemptId
+ } else {
+ HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}"
+ }
+ } else {
+ HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}"
+ }
+ }
+
+ private def firstAttemptRow(attemptInfo : ApplicationHistoryInfo) = {
+ val uiAddress =
+ if (attemptInfo.appAttemptId.equals("")) {
+ attemptInfo.appAttemptId
+ } else {
+ HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}" + "_" + s"${attemptInfo.appAttemptId}"
+ }
+
+ val startTime = UIUtils.formatDate(attemptInfo.startTime)
+ val endTime = UIUtils.formatDate(attemptInfo.endTime)
+ val duration = UIUtils.formatDuration(attemptInfo.endTime - attemptInfo.startTime)
+ val lastUpdated = UIUtils.formatDate(attemptInfo.lastUpdated)
+ val attemptId = attemptInfo.appAttemptId
+ {attemptId} |
+ {startTime} |
+ {endTime} |
+
+ {duration} |
+ {attemptInfo.sparkUser} |
+ {lastUpdated} |
+ }
+
+ private def attemptRow(attemptInfo: ApplicationHistoryInfo) = {
+
+ {firstAttemptRow(attemptInfo)}
+
+ }
+
+ private def appWithAttemptRow(
+ appAttemptsInfo: (String,ArrayBuffer[ApplicationHistoryInfo])): Seq[Node] = {
+ val applicationId = appAttemptsInfo._1
+ val info = appAttemptsInfo._2
+ val rowSpan = info.length
+ val rowSpanString = rowSpan.toString
+ val applicatioName = info(0).name
+ val lastAttemptURI = getAttemptURI(info(0), false)
+ val ttAttempts = info.slice(1, rowSpan -1)
+ val x = new xml.NodeBuffer
+ x +=
+
+ | {applicationId} |
+ {applicatioName} |
+ { firstAttemptRow(info(0)) }
+
;
+ for( i <- 1 until rowSpan ){
+ x += attemptRow(info(i))
+ }
+ x
+ }
private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
"/?" + Array(
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index 6d39a5e3fa64c..a591c7e046d5d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -26,6 +26,7 @@ package org.apache.spark.scheduler
private[spark] class ApplicationEventListener extends SparkListener {
var appName: Option[String] = None
var appId: Option[String] = None
+ var appAttemptId: Option[String] = None
var sparkUser: Option[String] = None
var startTime: Option[Long] = None
var endTime: Option[Long] = None
@@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
appName = Some(applicationStart.appName)
appId = applicationStart.appId
+ appAttemptId = Some(applicationStart.appAttemptId)
startTime = Some(applicationStart.time)
sparkUser = Some(applicationStart.sparkUser)
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 30075c172bdb1..a6782c1406c82 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -47,6 +47,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
+ appAttemptId : String,
logBaseDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
@@ -54,6 +55,9 @@ private[spark] class EventLoggingListener(
import EventLoggingListener._
+ def this(appId: String, logBaseDir: String, sparkConf: SparkConf, hadoopConf: Configuration) =
+ this(appId, "", logBaseDir, sparkConf, hadoopConf)
+
def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
@@ -80,7 +84,7 @@ private[spark] class EventLoggingListener(
private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
// Visible for tests only.
- private[scheduler] val logPath = getLogPath(logBaseDir, appId)
+ private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId)
/**
* Creates the log file in the configured log directory.
@@ -262,8 +266,26 @@ private[spark] object EventLoggingListener extends Logging {
* @return A path which consists of file-system-safe characters.
*/
def getLogPath(logBaseDir: String, appId: String): String = {
+ getLogPath(logBaseDir, appId, "")
+ }
+
+ /**
+ * Return a file-system-safe path to the log directory for the given application.
+ *
+ * @param logBaseDir A base directory for the path to the log directory for given application.
+ * @param appId A unique app ID.
+ * @param appAttemptId A unique attempt id of appId.
+ * @return A path which consists of file-system-safe characters.
+ */
+
+ def getLogPath(logBaseDir: String, appId: String, appAttemptId : String): String = {
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
- Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
+
+ if (appAttemptId.equals("")) {
+ Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
+ } else {
+ Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + "_" + appAttemptId
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 992c477493d8e..6a6ab0c82f310 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -41,4 +41,11 @@ private[spark] trait SchedulerBackend {
*/
def applicationId(): String = appId
+ /**
+ * Get an application ID associated with the job.
+ *
+ * @return An application attempt id
+ */
+ def applicationAttemptId(): String = ""
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index dd28ddb31de1f..e2ed4fe7b4c96 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
- sparkUser: String) extends SparkListenerEvent
+case class SparkListenerApplicationStart(appName: String, appId: Option[String],
+ time: Long, sparkUser: String, appAttemptId: String = "") extends SparkListenerEvent
@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index ed3418676e077..d612409c81b6f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -78,4 +78,12 @@ private[spark] trait TaskScheduler {
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
+
+ /**
+ * Get an application's attempt Id associated with the job.
+ *
+ * @return An application's Attempt ID
+ */
+ def applicationAttemptId(): String = ""
+
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 7a9cf1c2e7f30..7e83cc945be39 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -514,6 +514,8 @@ private[spark] class TaskSchedulerImpl(
}
override def applicationId(): String = backend.applicationId()
+
+ override def applicationAttemptId(): String = backend.applicationAttemptId()
}
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 8e20864db5673..71a53a844ddc8 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -192,7 +192,8 @@ private[spark] object JsonProtocol {
("App Name" -> applicationStart.appName) ~
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
- ("User" -> applicationStart.sparkUser)
+ ("User" -> applicationStart.sparkUser) ~
+ ("appAttemptId" -> applicationStart.appAttemptId)
}
def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
@@ -553,7 +554,8 @@ private[spark] object JsonProtocol {
val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
- SparkListenerApplicationStart(appName, appId, time, sparkUser)
+ val appAttemptId = (json \ "appAttemptId").extract[String]
+ SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
}
def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e966bfba7bb7d..b77943e1b57b3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -92,6 +92,11 @@ private[spark] class ApplicationMaster(
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
+
+ // Propagate the attempt if, so that in case of event logging,
+ // different attempt's logs gets created in different directory
+ System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString())
+
}
logInfo("ApplicationAttemptId: " + appAttemptId)
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index b1de81e6a8b0f..f20f4dcb00d64 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -46,5 +46,13 @@ private[spark] class YarnClusterSchedulerBackend(
logError("Application ID is not set.")
super.applicationId
}
-
+
+ override def applicationAttemptId(): String =
+ // In YARN Cluster mode, spark.yarn.app.attemptid is expect to be set
+ // before user application is launched.
+ // So, if spark.yarn.app.id is not set, it is something wrong.
+ sc.getConf.getOption("spark.yarn.app.attemptid").getOrElse {
+ logError("Application attempt ID is not set.")
+ super.applicationAttemptId
+ }
}