Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ private[spark] class CoarseGrainedExecutorBackend(
executorId: String,
hostPort: String,
cores: Int,
env: SparkEnv)
env: SparkEnv,
executorLogUrl: String)
extends Actor with ActorLogReceive with ExecutorBackend with Logging {

Utils.checkHostPort(hostPort, "Expected hostport")
Expand All @@ -57,7 +58,7 @@ private[spark] class CoarseGrainedExecutorBackend(
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, isLocal = false)
executor = new Executor(executorId, hostname, env, isLocal = false, executorLogUrl)

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
Expand Down Expand Up @@ -111,7 +112,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String]) {
workerUrl: Option[String],
executorLogUrl: String) {

SignalLogger.register(log)

Expand Down Expand Up @@ -156,7 +158,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
val sparkHostPort = hostname + ":" + boundPort
env.actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend],
driverUrl, executorId, sparkHostPort, cores, env),
driverUrl, executorId, sparkHostPort, cores, env, executorLogUrl),
name = "Executor")
workerUrl.foreach { url =>
env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
Expand All @@ -171,15 +173,17 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
System.err.println(
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
"<cores> <appid> [<workerUrl>] ")
"<cores> <appid> [<workerUrl>] [executorLogUrl <executorLogUrl>]")
System.exit(1)

// NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
// and CoarseMesosSchedulerBackend (for mesos mode).
case 5 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
run(args(0), args(1), args(2), args(3).toInt, args(4), None, "")
case 7 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), None, args(6))
case x if x > 5 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)))
run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)), "")
}
}
}
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
isLocal: Boolean = false)
isLocal: Boolean = false,
executorLogUrl: String = "")
extends Logging
{

Expand Down Expand Up @@ -79,6 +80,7 @@ private[spark] class Executor(

if (!isLocal) {
env.metricsSystem.registerSource(executorSource)
env.blockManager.setExecutorLogUrl(executorLogUrl)
env.blockManager.initialize(conf.getAppId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
case class SparkListenerBlockManagerAdded(
time: Long,
blockManagerId: BlockManagerId,
maxMem: Long,
executorLogUrl: String = "")
extends SparkListenerEvent

@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private[spark] class BlockManager(
@volatile private var cachedPeers: Seq[BlockManagerId] = _
private val peerFetchLock = new Object
private var lastPeerFetchTime = 0L
private var executorLogUrl = ""

/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
* the initialization of the compression codec until it is first used. The reason is that a Spark
Expand Down Expand Up @@ -202,7 +203,7 @@ private[spark] class BlockManager(
blockManagerId
}

master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
master.registerBlockManager(blockManagerId, maxMemory, slaveActor, executorLogUrl)

// Register Executors' configuration with the local shuffle service, if one should exist.
if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {
Expand Down Expand Up @@ -256,6 +257,10 @@ private[spark] class BlockManager(
}
}

def setExecutorLogUrl(url: String): Unit = {
executorLogUrl = url
}

/**
* Re-register with the master and report all blocks to it. This will be called by the heart beat
* thread if our heartbeat to the block manager indicates that we were not registered.
Expand All @@ -265,7 +270,7 @@ private[spark] class BlockManager(
def reregister(): Unit = {
// TODO: We might need to rate limit re-registering.
logInfo("BlockManager re-registering with master")
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
master.registerBlockManager(blockManagerId, maxMemory, slaveActor, executorLogUrl)
reportAllBlocks()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ class BlockManagerMaster(
}

/** Register the BlockManager's id with the driver. */
def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
def registerBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
slaveActor: ActorRef,
executorLogUrl: String) {
logInfo("Trying to register BlockManager")
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor))
tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, executorLogUrl))
logInfo("Registered BlockManager")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}

override def receiveWithLogging = {
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) =>
register(blockManagerId, maxMemSize, slaveActor)
case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, executoLogUrl) =>
register(blockManagerId, maxMemSize, slaveActor, executoLogUrl)
sender ! true

case UpdateBlockInfo(
Expand Down Expand Up @@ -325,7 +325,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
).map(_.flatten.toSeq)
}

private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
private def register(
id: BlockManagerId,
maxMemSize: Long,
slaveActor: ActorRef,
executorLogUrl: String) {
val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
Expand All @@ -344,7 +348,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockManagerInfo(id) = new BlockManagerInfo(
id, System.currentTimeMillis(), maxMemSize, slaveActor)
}
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize, executorLogUrl))
}

private def updateBlockInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ private[spark] object BlockManagerMessages {
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long,
sender: ActorRef)
sender: ActorRef,
executoLogUrl: String)
extends ToBlockManagerMaster

case class UpdateBlockInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class StorageStatusListener extends SparkListener {
val blockManagerId = blockManagerAdded.blockManagerId
val executorId = blockManagerId.executorId
val maxMem = blockManagerAdded.maxMem
val storageStatus = new StorageStatus(blockManagerId, maxMem)
val executorLogUrl = blockManagerAdded.executorLogUrl
val storageStatus = new StorageStatus(blockManagerId, maxMem, executorLogUrl)
executorIdToStorageStatus(executorId) = storageStatus
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
* class cannot mutate the source of the information. Accesses are not thread-safe.
*/
@DeveloperApi
class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
class StorageStatus(
val blockManagerId: BlockManagerId,
val maxMem: Long,
val executorLogUrl: String = "") {

/**
* Internal representation of the blocks stored in this block manager.
Expand Down
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ private case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
maxMemory: Long)
maxMemory: Long,
executorLogUrl: String)

private[ui] class ExecutorsPage(
parent: ExecutorsTab,
Expand Down Expand Up @@ -80,6 +81,7 @@ private[ui] class ExecutorsPage(
</span>
</th>
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
<th>Log</th>
</thead>
<tbody>
{execInfoSorted.map(execRow)}
Expand Down Expand Up @@ -148,6 +150,9 @@ private[ui] class ExecutorsPage(
Seq.empty
}
}
<td>
<a href={info.executorLogUrl}>Log</a>
</td>
</tr>
}

Expand All @@ -168,6 +173,7 @@ private[ui] class ExecutorsPage(
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
val executorLogUrl = status.executorLogUrl

new ExecutorSummaryInfo(
execId,
Expand All @@ -183,7 +189,8 @@ private[ui] class ExecutorsPage(
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
maxMem
maxMem,
executorLogUrl
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,17 @@ class ExecutorRunnable(
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))

var nnAddres = yarnConf.get(YarnConfiguration.NM_WEBAPP_ADDRESS,
YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS)
if(nnAddres.startsWith("0.0.0.0:")) {
val port = nnAddres.substring(8)
nnAddres = s"$hostname:$port"
}
val containerId = container.getId()
val containerLogUrl = s"https://$nnAddres/node/containerlogs/$containerId/"

val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,
appId, localResources)
appId, localResources, containerLogUrl)

logInfo(s"Setting up executor with environment: $env")
logInfo("Setting up executor with commands: " + commands)
Expand Down Expand Up @@ -118,7 +127,8 @@ class ExecutorRunnable(
executorMemory: Int,
executorCores: Int,
appId: String,
localResources: HashMap[String, LocalResource]): List[String] = {
localResources: HashMap[String, LocalResource],
executorLogUrl: String): List[String] = {
// Extra options for the JVM
val javaOpts = ListBuffer[String]()

Expand Down Expand Up @@ -200,6 +210,8 @@ class ExecutorRunnable(
hostname.toString,
executorCores.toString,
appId,
"executorLogUrl",
executorLogUrl,
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

Expand Down