Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ private[spark] class ApplicationInfo(
@transient var coresGranted: Int = _
@transient var endTime: Long = _
@transient var appSource: ApplicationSource = _
@transient @volatile var appUIUrlAtHistoryServer: Option[String] = None

// A cap on the number of executors this application can have at any given time.
// By default, this is infinite. Only after the first allocation request is issued by the
Expand All @@ -66,7 +65,6 @@ private[spark] class ApplicationInfo(
nextExecutorId = 0
removedExecutors = new ArrayBuffer[ExecutorDesc]
executorLimit = desc.initialExecutorLimit.getOrElse(Integer.MAX_VALUE)
appUIUrlAtHistoryServer = None
}

private def newExecutorId(useID: Option[Int] = None): Int = {
Expand Down Expand Up @@ -136,11 +134,4 @@ private[spark] class ApplicationInfo(
System.currentTimeMillis() - startTime
}
}

/**
* Returns the original application UI url unless there is its address at history server
* is defined
*/
def curAppUIUrl: String = appUIUrlAtHistoryServer.getOrElse(desc.appUiUrl)

}
109 changes: 1 addition & 108 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,25 @@

package org.apache.spark.deploy.master

import java.io.FileNotFoundException
import java.net.URLEncoder
import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration
import scala.language.postfixOps
import scala.util.Random

import org.apache.hadoop.fs.Path

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, DriverDescription,
ExecutorState, SparkHadoopUtil}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.deploy.rest.StandaloneRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.rpc._
import org.apache.spark.scheduler.{EventLoggingListener, ReplayListenerBus}
import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}

private[deploy] class Master(
Expand All @@ -59,10 +49,6 @@ private[deploy] class Master(
private val forwardMessageThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")

private val rebuildUIThread =
ThreadUtils.newDaemonSingleThreadExecutor("master-rebuild-ui-thread")
private val rebuildUIContext = ExecutionContext.fromExecutor(rebuildUIThread)

private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)

private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
Expand All @@ -85,8 +71,6 @@ private[deploy] class Master(
private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
private val completedApps = new ArrayBuffer[ApplicationInfo]
private var nextAppNumber = 0
// Using ConcurrentHashMap so that master-rebuild-ui-thread can add a UI after asyncRebuildUI
private val appIdToUI = new ConcurrentHashMap[String, SparkUI]

private val drivers = new HashSet[DriverInfo]
private val completedDrivers = new ArrayBuffer[DriverInfo]
Expand Down Expand Up @@ -199,7 +183,6 @@ private[deploy] class Master(
checkForWorkerTimeOutTask.cancel(true)
}
forwardMessageThread.shutdownNow()
rebuildUIThread.shutdownNow()
webUi.stop()
restServer.foreach(_.stop())
masterMetricsSystem.stop()
Expand Down Expand Up @@ -391,9 +374,6 @@ private[deploy] class Master(
case CheckForWorkerTimeOut =>
timeOutDeadWorkers()

case AttachCompletedRebuildUI(appId) =>
// An asyncRebuildSparkUI has completed, so need to attach to master webUi
Option(appIdToUI.get(appId)).foreach { ui => webUi.attachSparkUI(ui) }
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down Expand Up @@ -844,17 +824,13 @@ private[deploy] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach { a =>
Option(appIdToUI.remove(a.id)).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
}
completedApps.trimStart(toRemove)
}
completedApps += app // Remember it in our history
waitingApps -= app

// If application events are logged, use them to rebuild the UI
asyncRebuildSparkUI(app)

for (exec <- app.executors.values) {
killExecutor(exec)
}
Expand Down Expand Up @@ -953,89 +929,6 @@ private[deploy] class Master(
exec.state = ExecutorState.KILLED
}

/**
* Rebuild a new SparkUI from the given application's event logs.
* Return the UI if successful, else None
*/
private[master] def rebuildSparkUI(app: ApplicationInfo): Option[SparkUI] = {
val futureUI = asyncRebuildSparkUI(app)
ThreadUtils.awaitResult(futureUI, Duration.Inf)
}

/** Rebuild a new SparkUI asynchronously to not block RPC event loop */
private[master] def asyncRebuildSparkUI(app: ApplicationInfo): Future[Option[SparkUI]] = {
val appName = app.desc.name
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
val eventLogDir = app.desc.eventLogDir
.getOrElse {
// Event logging is disabled for this application
app.appUIUrlAtHistoryServer = Some(notFoundBasePath)
return Future.successful(None)
}
val futureUI = Future {
val eventLogFilePrefix = EventLoggingListener.getLogPath(
eventLogDir, app.id, appAttemptId = None, compressionCodecName = app.desc.eventLogCodec)
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
EventLoggingListener.IN_PROGRESS))

val eventLogFile = if (inProgressExists) {
// Event logging is enabled for this application, but the application is still in progress
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
eventLogFilePrefix + EventLoggingListener.IN_PROGRESS
} else {
eventLogFilePrefix
}

val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
val replayBus = new ReplayListenerBus()
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName, HistoryServer.UI_PATH_PREFIX + s"/${app.id}", app.startTime)
try {
replayBus.replay(logInput, eventLogFile, inProgressExists)
} finally {
logInput.close()
}

Some(ui)
}(rebuildUIContext)

futureUI.onSuccess { case Some(ui) =>
appIdToUI.put(app.id, ui)
// `self` can be null if we are already in the process of shutting down
// This happens frequently in tests where `local-cluster` is used
if (self != null) {
self.send(AttachCompletedRebuildUI(app.id))
}
// Application UI is successfully rebuilt, so link the Master UI to it
// NOTE - app.appUIUrlAtHistoryServer is volatile
app.appUIUrlAtHistoryServer = Some(ui.basePath)
}(ThreadUtils.sameThread)

futureUI.onFailure {
case fnf: FileNotFoundException =>
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir.get}."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer = Some(notFoundBasePath + s"?msg=$msg&title=$title")

case e: Exception =>
// Relay exception message to application UI page
val title = s"Application history load error (${app.id})"
val exception = URLEncoder.encode(Utils.exceptionString(e), "UTF-8")
var msg = s"Exception in replaying log for application $appName!"
logError(msg, e)
msg = URLEncoder.encode(msg, "UTF-8")
app.appUIUrlAtHistoryServer =
Some(notFoundBasePath + s"?msg=$msg&exception=$exception&title=$title")
}(ThreadUtils.sameThread)

futureUI
}

/** Generate a new app ID given a app's submission date */
private def newApplicationId(submitDate: Date): String = {
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,4 @@ private[master] object MasterMessages {
case object BoundPortsRequest

case class BoundPortsResponse(rpcEndpointPort: Int, webUIPort: Int, restPort: Option[Int])

case class AttachCompletedRebuildUI(appId: String)
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
<li><strong><a href={app.curAppUIUrl}>Application Detail UI</a></strong></li>
{
if (!app.isFinished) {
<li><strong><a href={app.desc.appUiUrl}>Application Detail UI</a></strong></li>
}
}
</ul>
</div>
</div>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,13 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
{killLink}
</td>
<td>
<a href={app.curAppUIUrl}>{app.desc.name}</a>
{
if (app.isFinished) {
app.desc.name
} else {
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
}
}
</td>
<td>
{app.coresGranted}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.deploy.master.ui

import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, ApplicationsListResource,
UIRoot}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._

Expand All @@ -30,60 +28,26 @@ import org.apache.spark.ui.JettyUtils._
private[master]
class MasterWebUI(
val master: Master,
requestedPort: Int,
customMasterPage: Option[MasterPage] = None)
requestedPort: Int)
extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot {
requestedPort, master.conf, name = "MasterUI") with Logging {

val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)

val masterPage = customMasterPage.getOrElse(new MasterPage(this))

initialize()

/** Initialize all components of the server. */
def initialize() {
val masterPage = new MasterPage(this)
attachPage(new ApplicationPage(this))
attachPage(new HistoryNotFoundPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(ApiRootResource.getServletHandler(this))
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
"/driver/kill", "/", masterPage.handleDriverKillRequest, httpMethods = Set("POST")))
}

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
}

/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
def detachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
ui.getHandlers.foreach(detachHandler)
}

def getApplicationInfoList: Iterator[ApplicationInfo] = {
val state = masterPage.getMasterState
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val completedApps = state.completedApps.sortBy(_.endTime).reverse
activeApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, false) } ++
completedApps.iterator.map { ApplicationsListResource.convertApplicationInfo(_, true) }
}

def getSparkUI(appId: String): Option[SparkUI] = {
val state = masterPage.getMasterState
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val completedApps = state.completedApps.sortBy(_.endTime).reverse
(activeApps ++ completedApps).find { _.id == appId }.flatMap {
master.rebuildSparkUI
}
}
}

private[master] object MasterWebUI {
Expand Down
Loading