diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index bc72c8970319c..78692a6ef7da5 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -38,7 +38,8 @@ private[spark] class CoarseGrainedExecutorBackend( executorId: String, hostPort: String, cores: Int, - env: SparkEnv) + env: SparkEnv, + executorLogUrl: String) extends Actor with ActorLogReceive with ExecutorBackend with Logging { Utils.checkHostPort(hostPort, "Expected hostport") @@ -57,7 +58,7 @@ private[spark] class CoarseGrainedExecutorBackend( case RegisteredExecutor => logInfo("Successfully registered with driver") val (hostname, _) = Utils.parseHostPort(hostPort) - executor = new Executor(executorId, hostname, env, isLocal = false) + executor = new Executor(executorId, hostname, env, isLocal = false, executorLogUrl) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -111,7 +112,8 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { hostname: String, cores: Int, appId: String, - workerUrl: Option[String]) { + workerUrl: Option[String], + executorLogUrl: String) { SignalLogger.register(log) @@ -156,7 +158,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { val sparkHostPort = hostname + ":" + boundPort env.actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], - driverUrl, executorId, sparkHostPort, cores, env), + driverUrl, executorId, sparkHostPort, cores, env, executorLogUrl), name = "Executor") workerUrl.foreach { url => env.actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") @@ -171,15 +173,17 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { System.err.println( // Worker url is used in spark standalone mode to enforce fate-sharing with worker "Usage: CoarseGrainedExecutorBackend " + - " [] ") + " [] [executorLogUrl ]") System.exit(1) // NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode) // and CoarseMesosSchedulerBackend (for mesos mode). case 5 => - run(args(0), args(1), args(2), args(3).toInt, args(4), None) + run(args(0), args(1), args(2), args(3).toInt, args(4), None, "") + case 7 => + run(args(0), args(1), args(2), args(3).toInt, args(4), None, args(6)) case x if x > 5 => - run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5))) + run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)), "") } } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 312bb3a1daaa3..ae3aada75ef7c 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -43,7 +43,8 @@ private[spark] class Executor( executorId: String, executorHostname: String, env: SparkEnv, - isLocal: Boolean = false) + isLocal: Boolean = false, + executorLogUrl: String = "") extends Logging { @@ -79,6 +80,7 @@ private[spark] class Executor( if (!isLocal) { env.metricsSystem.registerSource(executorSource) + env.blockManager.setExecutorLogUrl(executorLogUrl) env.blockManager.initialize(conf.getAppId) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index dd28ddb31de1f..c68c19e95a528 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -80,7 +80,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S extends SparkListenerEvent @DeveloperApi -case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long) +case class SparkListenerBlockManagerAdded( + time: Long, + blockManagerId: BlockManagerId, + maxMem: Long, + executorLogUrl: String = "") extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 8bc5a1cd18b64..82f0a7a3b2f8f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -154,6 +154,7 @@ private[spark] class BlockManager( @volatile private var cachedPeers: Seq[BlockManagerId] = _ private val peerFetchLock = new Object private var lastPeerFetchTime = 0L + private var executorLogUrl = "" /* The compression codec to use. Note that the "lazy" val is necessary because we want to delay * the initialization of the compression codec until it is first used. The reason is that a Spark @@ -202,7 +203,7 @@ private[spark] class BlockManager( blockManagerId } - master.registerBlockManager(blockManagerId, maxMemory, slaveActor) + master.registerBlockManager(blockManagerId, maxMemory, slaveActor, executorLogUrl) // Register Executors' configuration with the local shuffle service, if one should exist. if (externalShuffleServiceEnabled && !blockManagerId.isDriver) { @@ -256,6 +257,10 @@ private[spark] class BlockManager( } } + def setExecutorLogUrl(url: String): Unit = { + executorLogUrl = url + } + /** * Re-register with the master and report all blocks to it. This will be called by the heart beat * thread if our heartbeat to the block manager indicates that we were not registered. @@ -265,7 +270,7 @@ private[spark] class BlockManager( def reregister(): Unit = { // TODO: We might need to rate limit re-registering. logInfo("BlockManager re-registering with master") - master.registerBlockManager(blockManagerId, maxMemory, slaveActor) + master.registerBlockManager(blockManagerId, maxMemory, slaveActor, executorLogUrl) reportAllBlocks() } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index b63c7f191155c..5a8960f578a85 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -46,9 +46,13 @@ class BlockManagerMaster( } /** Register the BlockManager's id with the driver. */ - def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + def registerBlockManager( + blockManagerId: BlockManagerId, + maxMemSize: Long, + slaveActor: ActorRef, + executorLogUrl: String) { logInfo("Trying to register BlockManager") - tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)) + tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, executorLogUrl)) logInfo("Registered BlockManager") } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 64133464d8daa..217f84297c84a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -66,8 +66,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus } override def receiveWithLogging = { - case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor) => - register(blockManagerId, maxMemSize, slaveActor) + case RegisterBlockManager(blockManagerId, maxMemSize, slaveActor, executoLogUrl) => + register(blockManagerId, maxMemSize, slaveActor, executoLogUrl) sender ! true case UpdateBlockInfo( @@ -325,7 +325,11 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus ).map(_.flatten.toSeq) } - private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + private def register( + id: BlockManagerId, + maxMemSize: Long, + slaveActor: ActorRef, + executorLogUrl: String) { val time = System.currentTimeMillis() if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { @@ -344,7 +348,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus blockManagerInfo(id) = new BlockManagerInfo( id, System.currentTimeMillis(), maxMemSize, slaveActor) } - listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize)) + listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize, executorLogUrl)) } private def updateBlockInfo( diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 3f32099d08cc9..29cc0de8a258b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -52,7 +52,8 @@ private[spark] object BlockManagerMessages { case class RegisterBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, - sender: ActorRef) + sender: ActorRef, + executoLogUrl: String) extends ToBlockManagerMaster case class UpdateBlockInfo( diff --git a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala index def49e80a3605..494b026bb0ec6 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageStatusListener.scala @@ -76,7 +76,8 @@ class StorageStatusListener extends SparkListener { val blockManagerId = blockManagerAdded.blockManagerId val executorId = blockManagerId.executorId val maxMem = blockManagerAdded.maxMem - val storageStatus = new StorageStatus(blockManagerId, maxMem) + val executorLogUrl = blockManagerAdded.executorLogUrl + val storageStatus = new StorageStatus(blockManagerId, maxMem, executorLogUrl) executorIdToStorageStatus(executorId) = storageStatus } } diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 2bd6b749be261..6a632c0b19ec8 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -30,7 +30,10 @@ import org.apache.spark.annotation.DeveloperApi * class cannot mutate the source of the information. Accesses are not thread-safe. */ @DeveloperApi -class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) { +class StorageStatus( + val blockManagerId: BlockManagerId, + val maxMem: Long, + val executorLogUrl: String = "") { /** * Internal representation of the blocks stored in this block manager. diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 363cb96de7998..0829acb90a8cd 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -40,7 +40,8 @@ private case class ExecutorSummaryInfo( totalInputBytes: Long, totalShuffleRead: Long, totalShuffleWrite: Long, - maxMemory: Long) + maxMemory: Long, + executorLogUrl: String) private[ui] class ExecutorsPage( parent: ExecutorsTab, @@ -80,6 +81,7 @@ private[ui] class ExecutorsPage( {if (threadDumpEnabled) Thread Dump else Seq.empty} + Log {execInfoSorted.map(execRow)} @@ -148,6 +150,9 @@ private[ui] class ExecutorsPage( Seq.empty } } + + Log + } @@ -168,6 +173,7 @@ private[ui] class ExecutorsPage( val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) + val executorLogUrl = status.executorLogUrl new ExecutorSummaryInfo( execId, @@ -183,7 +189,8 @@ private[ui] class ExecutorsPage( totalInputBytes, totalShuffleRead, totalShuffleWrite, - maxMem + maxMem, + executorLogUrl ) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ee2002a35f523..5df907daa8bcc 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -82,8 +82,17 @@ class ExecutorRunnable( credentials.writeTokenStorageToStream(dob) ctx.setTokens(ByteBuffer.wrap(dob.getData())) + var nnAddres = yarnConf.get(YarnConfiguration.NM_WEBAPP_ADDRESS, + YarnConfiguration.DEFAULT_NM_WEBAPP_ADDRESS) + if(nnAddres.startsWith("0.0.0.0:")) { + val port = nnAddres.substring(8) + nnAddres = s"$hostname:$port" + } + val containerId = container.getId() + val containerLogUrl = s"https://$nnAddres/node/containerlogs/$containerId/" + val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, - appId, localResources) + appId, localResources, containerLogUrl) logInfo(s"Setting up executor with environment: $env") logInfo("Setting up executor with commands: " + commands) @@ -118,7 +127,8 @@ class ExecutorRunnable( executorMemory: Int, executorCores: Int, appId: String, - localResources: HashMap[String, LocalResource]): List[String] = { + localResources: HashMap[String, LocalResource], + executorLogUrl: String): List[String] = { // Extra options for the JVM val javaOpts = ListBuffer[String]() @@ -200,6 +210,8 @@ class ExecutorRunnable( hostname.toString, executorCores.toString, appId, + "executorLogUrl", + executorLogUrl, "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")