Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ <h4 style="clear: left; display: inline-block;">Executors</h4>
Shuffle Write</span></th>
<th><span data-toggle="tooltip" data-placement="left" title="Logs">Logs</span></th>
<th><span data-toggle="tooltip" data-placement="left" title="Thread Dump">Thread Dump</span></th>
<th><span data-toggle="tooltip" data-placement="left" title="Worker">Worker</span></th>
</tr>
</thead>
<tbody>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ function formatLogsCells(execLogs, type) {
return result;
}

function formatWorkersCells(worker, type) {
if ((!worker) || (worker['ui_url'] === undefined)) return "";
if (type !== 'display') return worker['ui_url'];
return '<td><a href=' + worker['ui_url'] + '>Worker</a></td>'
}

// Determine Color Opacity from 0.5-1
// activeTasks range from 0 to maxTasks
function activeTasksAlpha(activeTasks, maxTasks) {
Expand Down Expand Up @@ -394,7 +400,8 @@ $(document).ready(function () {
data: 'id', render: function (data, type) {
return type === 'display' ? ("<a href='threadDump/?executorId=" + data + "'>Thread Dump</a>" ) : data;
}
}
},
{data: 'worker', render: formatWorkersCells}
],
"order": [[0, "asc"]]
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ private[deploy] class ExecutorRunner(
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

// Add webUI worker urls and worker webUI urls
builder.environment.put("SPARK_WORKER_URL", workerUrl)
builder.environment.put("SPARK_WORKER_UI_URL", s"http://$publicAddress:$webUiPort")

process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private[spark] class CoarseGrainedExecutorBackend(
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractWorkerUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Expand All @@ -75,6 +76,12 @@ private[spark] class CoarseGrainedExecutorBackend(
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}

def extractWorkerUrls: Map[String, String] = {
val prefix = "SPARK_WORKER_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}

override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ private[spark] object CoarseGrainedClusterMessages {
executorRef: RpcEndpointRef,
hostname: String,
cores: Int,
logUrls: Map[String, String])
logUrls: Map[String, String],
workerUrl: Map[String, String] = Map.empty)
extends CoarseGrainedClusterMessage

case class StatusUpdate(executorId: String, taskId: Long, state: TaskState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls, workerUrl) =>
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
Expand All @@ -163,7 +163,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, hostname,
cores, cores, logUrls)
cores, cores, logUrls, workerUrl)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ private[cluster] class ExecutorData(
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
override val logUrlMap: Map[String, String],
override val workerUrl: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, workerUrl)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me also ping @JoshRosen about this change since it is adding a new field to ExecutorInfo.

@JoshRosen Does this change look good to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only concern that I see here is in binary-compatibility in the ExecutorInfo constructor, but that's easily fixable by adding an overloaded constructor which accepts the old arguments.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.annotation.DeveloperApi
class ExecutorInfo(
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String]) {
val logUrlMap: Map[String, String],
val workerUrl: Map[String, String] = Map.empty) {

def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]

Expand All @@ -35,12 +36,13 @@ class ExecutorInfo(
(that canEqual this) &&
executorHost == that.executorHost &&
totalCores == that.totalCores &&
logUrlMap == that.logUrlMap
logUrlMap == that.logUrlMap &&
workerUrl == that.workerUrl
case _ => false
}

override def hashCode(): Int = {
val state = Seq(executorHost, totalCores, logUrlMap)
val state = Seq(executorHost, totalCores, logUrlMap, workerUrl)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class ExecutorSummary private[spark](
val totalShuffleRead: Long,
val totalShuffleWrite: Long,
val maxMemory: Long,
val executorLogs: Map[String, String])
val executorLogs: Map[String, String],
val worker: Map[String, String])

class JobData private[spark](
val jobId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private[spark] object ExecutorsPage {
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
val worker = listener.executorToWorkerUrls.getOrElse(execId, Map.empty)

new ExecutorSummary(
execId,
Expand All @@ -116,7 +117,8 @@ private[spark] object ExecutorsPage {
totalShuffleRead,
totalShuffleWrite,
maxMem,
executorLogs
executorLogs,
worker
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
val executorToLogUrls = HashMap[String, Map[String, String]]()
val executorToWorkerUrls = HashMap[String, Map[String, String]]()
val executorIdToData = HashMap[String, ExecutorUIData]()

def activeStorageStatusList: Seq[StorageStatus] = storageStatusListener.storageStatusList
Expand All @@ -68,6 +69,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = synchronized {
val eid = executorAdded.executorId
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
executorToWorkerUrls(eid) = executorAdded.executorInfo.workerUrl
executorToTotalCores(eid) = executorAdded.executorInfo.totalCores
executorToTasksMax(eid) = executorToTotalCores(eid) / conf.getInt("spark.task.cpus", 1)
executorIdToData(eid) = new ExecutorUIData(executorAdded.time)
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ private[spark] object JsonProtocol {
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
("Total Cores" -> executorInfo.totalCores) ~
("Log Urls" -> mapToJson(executorInfo.logUrlMap))
("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
("Worker" -> mapToJson(executorInfo.workerUrl))
}

/** ------------------------------ *
Expand Down Expand Up @@ -934,7 +935,8 @@ private[spark] object JsonProtocol {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
val logUrls = mapFromJson(json \ "Log Urls").toMap
new ExecutorInfo(executorHost, totalCores, logUrls)
val worker = mapFromJson(json \ "Worker").toMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a compatibility concern, actually: in order to handle replay of old logs, we need to treat this field as optional. See the examples in this file plus the block comment at the top of JsonProtocol for instructions. Make sure to add a test case which exercises the backwards-compatibility path, too.

new ExecutorInfo(executorHost, totalCores, logUrls, worker)
}

/** -------------------------------- *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
"totalShuffleRead" : 0,
"totalShuffleWrite" : 13180,
"maxMemory" : 278302556,
"executorLogs" : { }
"executorLogs" : { },
"worker" : { }
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,15 @@ class JsonProtocolSuite extends SparkFunSuite {
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val workerUrlMap = Map("url" -> "spark://[email protected]:32790",
"ui_url" -> "http://192.168.1.104:46445").toMap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like these ip's should be something more generic, maybe check if there any other places in the test code that have strings with ip and ports that you could copy? Not an actual problem though since whats in the string only has to match the string below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I've updated referring to the test cases in ClientSuite.scala. Thanks!

val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"),
42L, "Garfield", Some("appAttempt"))
val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"),
42L, "Garfield", Some("appAttempt"), Some(logUrlMap))
val applicationEnd = SparkListenerApplicationEnd(42L)
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, workerUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
val executorMetricsUpdate = {
// Use custom accum ID for determinism
Expand Down Expand Up @@ -1749,6 +1751,10 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Log Urls" : {
| "stderr" : "mystderr",
| "stdout" : "mystdout"
| },
| "Worker" : {
| "url" : "spark://[email protected]:32790",
| "ui_url" : "http://192.168.1.104:46445"
| }
| }
|}
Expand Down
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ object MimaExcludes {
Seq(
// [SPARK-16199][SQL] Add a method to list the referenced columns in data source Filter
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.Filter.references")
) ++
Seq(
// [SPARK-16520] [WEBUI] Link executors to corresponding worker pages
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.cluster.ExecutorInfo.this")
)
}

Expand Down