diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index b50e043d5c9ce..290fcecf8a733 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -88,6 +88,8 @@ private static class ClientPool { private EventLoopGroup workerGroup; private PooledByteBufAllocator pooledAllocator; + public PooledByteBufAllocator getPooledAllocator() { return pooledAllocator; } + public TransportClientFactory( TransportContext context, List clientBootstraps) { diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java index 047c5f3f1f094..e1d44f3a8caf1 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -53,6 +53,7 @@ public class TransportServer implements Closeable { private ServerBootstrap bootstrap; private ChannelFuture channelFuture; + private PooledByteBufAllocator allocator; private int port = -1; /** @@ -78,6 +79,8 @@ public TransportServer( } } + public PooledByteBufAllocator getAllocator() { return allocator; } + public int getPort() { if (port == -1) { throw new IllegalStateException("Server not initialized"); @@ -92,7 +95,7 @@ private void init(String hostToBind, int portToBind) { NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; - PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( + allocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); bootstrap = new ServerBootstrap() diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 5242ab6f55235..d0050e988ea79 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable import scala.concurrent.Future +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ @@ -36,6 +37,7 @@ import org.apache.spark.util._ */ private[spark] case class Heartbeat( executorId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates blockManagerId: BlockManagerId) @@ -120,14 +122,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) context.reply(true) // Messages received from executors - case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) => + case heartbeat @ Heartbeat(executorId, executorMetrics, accumUpdates, blockManagerId) => if (scheduler != null) { if (executorLastSeen.contains(executorId)) { executorLastSeen(executorId) = clock.getTimeMillis() eventLoopThread.submit(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { val unknownExecutor = !scheduler.executorHeartbeatReceived( - executorId, accumUpdates, blockManagerId) + executorId, executorMetrics, accumUpdates, blockManagerId) val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor) context.reply(response) } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index f4a59f069a5f9..ed8463e1b8cb7 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -33,6 +33,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator} @@ -63,6 +64,7 @@ class SparkEnv ( val mapOutputTracker: MapOutputTracker, val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, + val blockTransferService: BlockTransferService, val blockManager: BlockManager, val securityManager: SecurityManager, val metricsSystem: MetricsSystem, @@ -383,6 +385,7 @@ object SparkEnv extends Logging { mapOutputTracker, shuffleManager, broadcastManager, + blockTransferService, blockManager, securityManager, metricsSystem, diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 83469c5ff0600..b1a4eb89be46d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -102,6 +102,12 @@ private[spark] class Executor( env.blockManager.initialize(conf.getAppId) } + private val executorMetrics: ExecutorMetrics = new ExecutorMetrics + executorMetrics.setHostname(Utils.localHostName) + if (env.rpcEnv.address != null) { + executorMetrics.setPort(Some(env.rpcEnv.address.port)) + } + // Whether to load classes in user jars before those in Spark jars private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false) @@ -704,7 +710,21 @@ private[spark] class Executor( } } - val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId) + env.blockTransferService.updateMemMetrics(this.executorMetrics) + val executorMetrics = if (isLocal) { + // When running locally, there is a chance that the executorMetrics could change + // out from under us. So, copy them here. In non-local mode this object would be + // serialized and de-serialized on its way to the driver. Perform that operation here + // to obtain the same result as non-local mode. + // TODO - Add a test that fails in local mode if we don't copy executorMetrics here. + Utils.deserialize[ExecutorMetrics](Utils.serialize(this.executorMetrics)) + } else { + this.executorMetrics + } + + val message = Heartbeat( + executorId, executorMetrics, accumUpdates.toArray, env.blockManager.blockManagerId) + try { val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s")) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala new file mode 100644 index 0000000000000..5d03ba6df96a6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +class ExecutorMetrics extends Serializable { + + private var _hostname: String = "" + def hostname: String = _hostname + private[spark] def setHostname(value: String) = _hostname = value + + private var _port: Option[Int] = None + def port: Option[Int] = _port + private[spark] def setPort(value: Option[Int]) = _port = value + + private[spark] def hostPort: String = { + port match { + case None => hostname + case value => hostname + ":" + value.get + } + } + + private var _transportMetrics: TransportMetrics = + new TransportMetrics(System.currentTimeMillis(), 0L, 0L) + def transportMetrics: TransportMetrics = _transportMetrics + private[spark] def setTransportMetrics(value: TransportMetrics) = { + _transportMetrics = value + } +} + +object ExecutorMetrics extends Serializable { + def apply( + hostName: String, + port: Option[Int], + transportMetrics: TransportMetrics): ExecutorMetrics = { + val execMetrics = new ExecutorMetrics + execMetrics.setHostname(hostName) + execMetrics.setPort(port) + execMetrics.setTransportMetrics(transportMetrics) + execMetrics + } +} + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class TransportMetrics ( + val timeStamp: Long, + val onHeapSize: Long, + val offHeapSize: Long) extends Serializable + diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index cb9d389dd7ea6..b0e341d340584 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -24,6 +24,7 @@ import scala.concurrent.{Future, Promise} import scala.concurrent.duration.Duration import scala.reflect.ClassTag +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} @@ -39,6 +40,11 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo */ def init(blockDataManager: BlockDataManager): Unit + /** + * Collect current executor memory metrics of transferService. + */ + private[spark] def updateMemMetrics(executorMetrics: ExecutorMetrics): Unit + /** * Tear down the transfer service. */ diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index b75e91b660969..1a8911a0f9a59 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -18,12 +18,16 @@ package org.apache.spark.network.netty import java.nio.ByteBuffer +import java.util.{List => JList} import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} import scala.reflect.ClassTag -import org.apache.spark.{SecurityManager, SparkConf} +import io.netty.buffer._ + +import org.apache.spark.{SecurityManager, SparkConf, SparkEnv} +import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics} import org.apache.spark.network._ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory} @@ -34,7 +38,7 @@ import org.apache.spark.network.shuffle.protocol.UploadBlock import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer import org.apache.spark.storage.{BlockId, StorageLevel} -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, SystemClock, Utils} /** * A BlockTransferService that uses Netty to fetch a set of blocks at time. @@ -57,6 +61,38 @@ private[spark] class NettyBlockTransferService( private[this] var server: TransportServer = _ private[this] var clientFactory: TransportClientFactory = _ private[this] var appId: String = _ + private[this] var clock: Clock = new SystemClock() + + /** + * Use a different clock for this allocation manager. This is mainly used for testing. + */ + def setClock(newClock: Clock): Unit = { + clock = newClock + } + + private[spark] override def updateMemMetrics(executorMetrics: ExecutorMetrics): Unit = { + val currentTime = clock.getTimeMillis() + val clientPooledAllocator = clientFactory.getPooledAllocator() + val serverAllocator = server.getAllocator() + val clientOffHeapSize = sumOfMetrics(clientPooledAllocator.directArenas()) + val clientOnHeapSize = sumOfMetrics(clientPooledAllocator.heapArenas()) + val serverOffHeapSize = sumOfMetrics(serverAllocator.directArenas()) + val serverOnHeapSize = sumOfMetrics(serverAllocator.heapArenas()) + logDebug(s"Current Netty Client offheap size is $clientOffHeapSize, " + + s"Client heap size is $clientOnHeapSize, Server offheap size is $serverOffHeapSize, " + + s"server heap size is $serverOnHeapSize, executor id is " + + s"${SparkEnv.get.blockManager.blockManagerId.executorId}") + executorMetrics.setTransportMetrics(TransportMetrics(currentTime, + clientOnHeapSize + serverOnHeapSize, clientOffHeapSize + serverOffHeapSize)) + } + + private def sumOfMetrics(arenaMetricList: JList[PoolArenaMetric]): Long = { + arenaMetricList.asScala.map { Arena => + Arena.chunkLists().asScala.map { chunk => + chunk.iterator().asScala.map(_.chunkSize()).sum + }.sum + }.sum + } override def init(blockDataManager: BlockDataManager): Unit = { val rpcHandler = new NettyBlockRpcServer(conf.getAppId, serializer, blockDataManager) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index aab177f257a8c..829ae7be2b7e2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -34,7 +34,7 @@ import org.apache.commons.lang3.SerializationUtils import org.apache.spark._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} @@ -229,16 +229,18 @@ class DAGScheduler( } /** - * Update metrics for in-progress tasks and let the master know that the BlockManager is still - * alive. Return true if the driver knows about the given block manager. Otherwise, return false, - * indicating that the block manager should re-register. + * Update metrics for live executor and in-progress tasks and let the master know that the + * BlockManager is still alive. Return true if the driver knows about the given block manager. + * Otherwise, return false, indicating that the block manager should re-register. */ def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, // (taskId, stageId, stageAttemptId, accumUpdates) accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])], blockManagerId: BlockManagerId): Boolean = { - listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates)) + listenerBus.post( + SparkListenerExecutorMetricsUpdate(execId, accumUpdates, Some(executorMetrics))) blockManagerMaster.driverEndpoint.askSync[Boolean]( BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat")) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index aecb3a980e7c1..c8e5ef1872456 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets import java.util.Locale import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashMap} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path} @@ -33,6 +33,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{JsonProtocol, Utils} @@ -87,6 +88,10 @@ private[spark] class EventLoggingListener( // Visible for tests only. private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName) + private val executorIdToLatestMetrics = new HashMap[String, SparkListenerExecutorMetricsUpdate] + private val executorIdToModifiedMaxMetrics = + new HashMap[String, SparkListenerExecutorMetricsUpdate] + /** * Creates the log file in the configured log directory. */ @@ -145,8 +150,23 @@ private[spark] class EventLoggingListener( } } + // When a stage is submitted and completed, update the executor memory metrics for that + // stage, and then log the metrics. Anytime we receive more executor metrics, update the + // running set of {{executorIdToLatestMetrics}} and {{executorIdToModifiedMaxMetrics}}. + // Since stage submit and complete times might be interleaved, maintain the latest and + // max metrics for each time segment. For each stage start and stage completion, replace + // each item in {{executorIdToModifiedMaxMetrics}} with that in {{executorIdToLatestMetrics}}. + private def updateAndLogExecutorMemoryMetrics() : Unit = { + executorIdToModifiedMaxMetrics.foreach { case(_, metrics) => logEvent(metrics) } + executorIdToModifiedMaxMetrics.clear() + executorIdToLatestMetrics.foreach { case(_, metrics) => logEvent(metrics) } + } + // Events that do not trigger a flush - override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event) + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + updateAndLogExecutorMemoryMetrics() + logEvent(event) + } override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event) @@ -160,6 +180,7 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + updateAndLogExecutorMemoryMetrics() logEvent(event, flushLogger = true) } @@ -186,11 +207,14 @@ private[spark] class EventLoggingListener( override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = { logEvent(event, flushLogger = true) } + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { logEvent(event, flushLogger = true) } override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { + executorIdToLatestMetrics.remove(event.executorId) + executorIdToModifiedMaxMetrics.remove(event.executorId) logEvent(event, flushLogger = true) } @@ -213,8 +237,15 @@ private[spark] class EventLoggingListener( // No-op because logging every update would be overkill override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {} - // No-op because logging every update would be overkill - override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { } + // Track executor metrics for logging on stage start and end + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + // We only track the executor metrics in each stage, so we drop the task metrics as they are + // quite verbose + val eventWithoutTaskMetrics = SparkListenerExecutorMetricsUpdate( + event.execId, Seq.empty, event.executorMetrics) + executorIdToLatestMetrics(eventWithoutTaskMetrics.execId) = eventWithoutTaskMetrics + updateModifiedMetrics(eventWithoutTaskMetrics) + } override def onOtherEvent(event: SparkListenerEvent): Unit = { if (event.logEvent) { @@ -259,6 +290,55 @@ private[spark] class EventLoggingListener( SparkListenerEnvironmentUpdate(redactedEnvironmentDetails) } + /** + * Process an executor metrics update and update our stored cache of events. + * Does this event match the ID of an executor we are already tracking? + * If no, start tracking metrics for this executor, starting at this event. + * If yes, compare time stamps, and perhaps update using this event. + * Only do this if executorMetrics is present in the toBeModifiedEvent. + * If it is not - meaning we are processing historical data created + * without executorMetrics - simply cache the latestEvent + * @param latestEvent the latest event received, used to update our map of stored metrics. + */ + private def updateModifiedMetrics(latestEvent: SparkListenerExecutorMetricsUpdate): Unit = { + val executorId = latestEvent.execId + val toBeModifiedEvent = executorIdToModifiedMaxMetrics.get(executorId) + toBeModifiedEvent match { + case None => + executorIdToModifiedMaxMetrics(executorId) = latestEvent + case Some(toBeModifiedEvent) => + if (toBeModifiedEvent.executorMetrics.isEmpty || + latestEvent.executorMetrics.isEmpty) { + executorIdToModifiedMaxMetrics(executorId) == latestEvent + } + else { + val prevTransportMetrics = toBeModifiedEvent.executorMetrics.get.transportMetrics + val latestTransportMetrics = latestEvent.executorMetrics.get.transportMetrics + var timeStamp: Long = prevTransportMetrics.timeStamp + + val onHeapSize = if + (latestTransportMetrics.onHeapSize > prevTransportMetrics.onHeapSize) { + timeStamp = latestTransportMetrics.timeStamp + latestTransportMetrics.onHeapSize + } else { + prevTransportMetrics.onHeapSize + } + val offHeapSize = + if (latestTransportMetrics.offHeapSize > prevTransportMetrics.offHeapSize) { + timeStamp = latestTransportMetrics.timeStamp + latestTransportMetrics.offHeapSize + } else { + prevTransportMetrics.offHeapSize + } + val updatedExecMetrics = ExecutorMetrics(toBeModifiedEvent.executorMetrics.get.hostname, + toBeModifiedEvent.executorMetrics.get.port, + TransportMetrics(timeStamp, onHeapSize, offHeapSize)) + val modifiedEvent = SparkListenerExecutorMetricsUpdate( + toBeModifiedEvent.execId, toBeModifiedEvent.accumUpdates, Some(updatedExecMetrics)) + executorIdToModifiedMaxMetrics(executorId) = modifiedEvent + } + } + } } private[spark] object EventLoggingListener extends Logging { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index bc2e530716686..e8859aaf3e64f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo import org.apache.spark.{SparkConf, TaskEndReason} import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo} import org.apache.spark.ui.SparkUI @@ -139,11 +139,13 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends * Periodic updates from executors. * @param execId executor id * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates) + * @param executorMetrics keeps track of TransportMetrics for an executor Added in Spark 2.3. */ @DeveloperApi case class SparkListenerExecutorMetricsUpdate( execId: String, - accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])]) + accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])], + executorMetrics: Option[ExecutorMetrics]) extends SparkListenerEvent @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index 3de7d1f7de22b..6cede660ffdb7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AccumulatorV2 @@ -74,6 +75,7 @@ private[spark] trait TaskScheduler { */ def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 1b6bc9139f9c9..6c920209c7890 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -28,6 +28,7 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.Logging import org.apache.spark.internal.config import org.apache.spark.scheduler.SchedulingMode.SchedulingMode @@ -450,6 +451,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( */ override def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) @@ -461,7 +463,8 @@ private[spark] class TaskSchedulerImpl private[scheduler]( } } } - dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId) + dagScheduler.executorHeartbeatReceived( + execId, executorMetrics, accumUpdatesWithTaskIds, blockManagerId) } def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 7d31ac54a7177..eaf85752c319f 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -31,6 +31,7 @@ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab} import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab} import org.apache.spark.ui.jobs.{JobProgressListener, JobsTab, StagesTab} +import org.apache.spark.ui.memory.{MemoryListener, MemoryTab} import org.apache.spark.ui.scope.RDDOperationGraphListener import org.apache.spark.ui.storage.{StorageListener, StorageTab} import org.apache.spark.util.Utils @@ -47,6 +48,7 @@ private[spark] class SparkUI private ( val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, val storageListener: StorageListener, + val memoryListener: MemoryListener, val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String, @@ -71,6 +73,7 @@ private[spark] class SparkUI private ( attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) + attachTab(new MemoryTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) @@ -210,16 +213,18 @@ private[spark] object SparkUI { val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) + val memoryListener = new MemoryListener val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) + listenerBus.addListener(memoryListener) listenerBus.addListener(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, - executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath, startTime) + executorsListener, _jobProgressListener, storageListener, memoryListener, + operationGraphListener, appName, basePath, startTime) } } diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala new file mode 100644 index 0000000000000..b674f074fe62e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryPage.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import javax.servlet.http.HttpServletRequest + +import scala.xml.{Node, NodeSeq} + +import org.apache.spark.ui.{UIUtils, WebUIPage} + +private[ui] class MemoryPage(parent: MemoryTab) extends WebUIPage("") { + private val memoryListener = parent.memoryListener + private val progressListener = parent.progressListener + + def render(request: HttpServletRequest): Seq[Node] = { + + val activeExecutorIdToMem = memoryListener.activeExecutorIdToMem + val removedExecutorIdToMem = memoryListener.removedExecutorIdToMem + val completedStages = progressListener.completedStages.reverse.toSeq + val failedStages = progressListener.failedStages.reverse.toSeq + val numberCompletedStages = progressListener.numCompletedStages + val numberFailedStages = progressListener.numFailedStages + val activeMemInfoSorted = activeExecutorIdToMem.toSeq.sortBy(_._1) + val removedMemInfoSorted = removedExecutorIdToMem.toSeq.sortBy(_._1) + val shouldShowActiveExecutors = activeExecutorIdToMem.nonEmpty + val shouldShowRemovedExecutors = removedExecutorIdToMem.nonEmpty + val shouldShowCompletedStages = completedStages.nonEmpty + val shouldShowFailedStages = failedStages.nonEmpty + + val activeExecMemTable = new MemTableBase(activeMemInfoSorted, memoryListener) + val removedExecMemTable = new MemTableBase(removedMemInfoSorted, memoryListener) + val completedStagesTable = new StagesTableBase( + completedStages, parent.basePath, progressListener) + val failedStagesTable = new StagesTableBase(failedStages, parent.basePath, progressListener) + + val summary: NodeSeq = +
+ +
+ + var content = summary + if (shouldShowActiveExecutors) { + content ++=

Active Executors ({activeExecutorIdToMem.size})

++ + activeExecMemTable.toNodeSeq + } + if (shouldShowRemovedExecutors) { + content ++=

Removed Executors ({removedMemInfoSorted.size})

++ + removedExecMemTable.toNodeSeq + } + if (shouldShowCompletedStages) { + content ++=

Completed Stages ({numberCompletedStages})

++ + completedStagesTable.toNodeSeq + } + if (shouldShowFailedStages) { + content ++=

Failed Stages ({numberFailedStages})

++ + failedStagesTable.toNodeSeq + } + + UIUtils.headerSparkPage("Memory Usage", content, parent) + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala new file mode 100644 index 0000000000000..ce008d4c26cfe --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTab.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import scala.collection.mutable.HashMap + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics} +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.ui.{SparkUI, SparkUITab} + +private[ui] class MemoryTab(parent: SparkUI) extends SparkUITab(parent, "memory") { + val memoryListener = parent.memoryListener + val progressListener = parent.jobProgressListener + attachPage(new MemoryPage(this)) + attachPage(new StageMemoryPage(this)) +} + +/** + * :: DeveloperApi :: + * A SparkListener that prepares information to be displayed on the MemoryTab. + */ +@DeveloperApi +class MemoryListener extends SparkListener { + type ExecutorId = String + val activeExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo] + // TODO This may use too much memory. + // There may be many removed executors (e.g. in Dynamic Allocation Mode). + val removedExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo] + // A map that maintains the latest metrics of each active executor. + val latestExecIdToExecMetrics = new HashMap[ExecutorId, ExecutorMetrics] + // A map that maintains all executors memory information of each stage. + // [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)] + val activeStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]] + // TODO Get conf of the retained stages so that we don't need to handle them all. + // There may be many completed stages. + val completedStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]] + + override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { + val executorId = event.execId + val executorMetrics = event.executorMetrics + activeExecutorIdToMem + .getOrElseUpdate(executorId, new MemoryUIInfo) + .updateMemUiInfo(executorMetrics.get) + activeStagesToMem.foreach { case (_, stageMemMetrics) => + // If executor is added in the stage running time, we also update the metrics for the + // executor in {{activeStagesToMem}} + if (!stageMemMetrics.contains(executorId)) { + stageMemMetrics(executorId) = new MemoryUIInfo + } + stageMemMetrics(executorId).updateMemUiInfo(executorMetrics.get) + } + latestExecIdToExecMetrics(executorId) = executorMetrics.get + } + + override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = { + val executorId = event.executorId + activeExecutorIdToMem.put(executorId, new MemoryUIInfo(event.executorInfo)) + } + + override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = { + val executorId = event.executorId + val info = activeExecutorIdToMem.remove(executorId) + latestExecIdToExecMetrics.remove(executorId) + removedExecutorIdToMem.getOrElseUpdate(executorId, info.getOrElse(new MemoryUIInfo)) + } + + override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = { + val stage = (event.stageInfo.stageId, event.stageInfo.attemptId) + val memInfoMap = new HashMap[ExecutorId, MemoryUIInfo] + activeExecutorIdToMem.map { case (id, _) => + memInfoMap(id) = new MemoryUIInfo + val latestExecMetrics = latestExecIdToExecMetrics.get(id) + latestExecMetrics match { + case None => // Do nothing + case Some(metrics) => + memInfoMap(id).updateMemUiInfo(metrics) + } + } + activeStagesToMem(stage) = memInfoMap + } + + override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + val stage = (event.stageInfo.stageId, event.stageInfo.attemptId) + // We need to refresh {{activeStagesToMem}} with {{activeExecutorIdToMem}} in case the + // executor is added in the stage running time and no {{SparkListenerExecutorMetricsUpdate}} + // event is updated in this stage. + activeStagesToMem.get(stage).map { memInfoMap => + activeExecutorIdToMem.foreach { case (executorId, memUiInfo) => + if (!memInfoMap.contains(executorId)) { + memInfoMap(executorId) = new MemoryUIInfo + memInfoMap(executorId).copyMemUiInfo(memUiInfo) + } + } + } + completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get) + } +} + +@DeveloperApi +class MemoryUIInfo { + var executorAddress: String = _ + var transportInfo: Option[TransportMemSize] = None + + def this(execInfo: ExecutorInfo) = { + this() + executorAddress = execInfo.executorHost + } + + def updateMemUiInfo(execMetrics: ExecutorMetrics): Unit = { + transportInfo = transportInfo match { + case Some(transportMemSize) => transportInfo + case _ => Some(new TransportMemSize) + } + executorAddress = execMetrics.hostPort + transportInfo.get.updateTransMemSize(execMetrics.transportMetrics) + } + + def copyMemUiInfo(memUiInfo: MemoryUIInfo): Unit = { + executorAddress = memUiInfo.executorAddress + transportInfo.foreach(_.copyTransMemSize(memUiInfo.transportInfo.get)) + } +} + +@DeveloperApi +class TransportMemSize { + var onHeapSize: Long = _ + var offHeapSize: Long = _ + var peakOnHeapSizeTime: MemTime = new MemTime() + var peakOffHeapSizeTime: MemTime = new MemTime() + + def updateTransMemSize(transportMetrics: TransportMetrics): Unit = { + val updatedOnHeapSize = transportMetrics.onHeapSize + val updatedOffHeapSize = transportMetrics.offHeapSize + val updateTime: Long = transportMetrics.timeStamp + onHeapSize = updatedOnHeapSize + offHeapSize = updatedOffHeapSize + if (updatedOnHeapSize >= peakOnHeapSizeTime.memorySize) { + peakOnHeapSizeTime = MemTime(updatedOnHeapSize, updateTime) + } + if (updatedOffHeapSize >= peakOffHeapSizeTime.memorySize) { + peakOffHeapSizeTime = MemTime(updatedOffHeapSize, updateTime) + } + } + + def copyTransMemSize(transMemSize: TransportMemSize): Unit = { + onHeapSize = transMemSize.onHeapSize + offHeapSize = transMemSize.offHeapSize + peakOnHeapSizeTime = MemTime(transMemSize.peakOnHeapSizeTime.memorySize, + transMemSize.peakOnHeapSizeTime.timeStamp) + peakOffHeapSizeTime = MemTime(transMemSize.peakOffHeapSizeTime.memorySize, + transMemSize.peakOffHeapSizeTime.timeStamp) + } +} + +@DeveloperApi +case class MemTime(memorySize: Long = 0L, timeStamp: Long = System.currentTimeMillis) diff --git a/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala new file mode 100644 index 0000000000000..74036ba44b0f8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/memory/MemoryTable.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import java.util.Date + +import scala.xml.Node + +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.ui.jobs.JobProgressListener +import org.apache.spark.ui.UIUtils +import org.apache.spark.util.Utils + + +private[ui] class MemTableBase( + memInfos: Seq[(String, MemoryUIInfo)], + listener: MemoryListener) { + + protected def columns: Seq[Node] = { + Executor ID + Address + Network Memory (on-heap) + Network Memory (off-heap) + Peak Network Memory (on-heap) / Peak Time + Peak Network Read (off-heap) / Peak Time + } + + def toNodeSeq: Seq[Node] = { + listener.synchronized { + memTable(showRow, memInfos) + } + } + + protected def memTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + + + {columns} + + + {rows.map(r => makeRow(r))} + +
+ } + + /** Render an HTML row representing an executor */ + private def showRow(info: (String, MemoryUIInfo)): Seq[Node] = { + + + {info._1} + + + {info._2.executorAddress} + + {if (info._2.transportInfo.isDefined) { + + {Utils.bytesToString(info._2.transportInfo.get.onHeapSize)} + + + {Utils.bytesToString(info._2.transportInfo.get.offHeapSize)} + + + {Utils.bytesToString(info._2.transportInfo.get.peakOnHeapSizeTime.memorySize)} + / + {UIUtils.formatDate(info._2.transportInfo.get.peakOnHeapSizeTime.timeStamp)} + + + {Utils.bytesToString(info._2.transportInfo.get.peakOffHeapSizeTime.memorySize)} + / + {UIUtils.formatDate(info._2.transportInfo.get.peakOffHeapSizeTime.timeStamp)} + + } else { + N/A + N/A + N/A + N/A + }} + + } +} + +private[ui] class StagesTableBase( + stageInfos: Seq[StageInfo], + basePath: String, + listener: JobProgressListener) { + protected def columns: Seq[Node] = { + Stage Id + Description + Submitted + } + + def toNodeSeq: Seq[Node] = { + listener.synchronized { + stagesTable(showRow, stageInfos) + } + } + + protected def stagesTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + + + {columns} + + + {rows.map(r => makeRow(r))} + +
+ } + + private def showRow(info: StageInfo): Seq[Node] = { + val submissionTime = info.submissionTime match { + case Some(t) => UIUtils.formatDate(new Date(t)) + case None => "Unknown" + } + + + {info.stageId} + {makeDescription(info)} + {submissionTime} + +} + + private def makeDescription(s: StageInfo): Seq[Node] = { + val basePathUri = UIUtils.prependBaseUri(basePath) + val nameLinkUri = s"$basePathUri/memory/stage?id=${s.stageId}&attempt=${s.attemptId}" +
+ {s.name} +
+ } +} diff --git a/core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala b/core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala new file mode 100644 index 0000000000000..b5059dce2a522 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/memory/StageMemoryPage.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.ui.{UIUtils, WebUIPage} + +/** Page showing memory information for a given stage */ +private[ui] class StageMemoryPage(parent: MemoryTab) extends WebUIPage("stage") { + private val memoryListener = parent.memoryListener + + def render(request: HttpServletRequest): Seq[Node] = { + memoryListener.synchronized { + val parameterId = request.getParameter("id") + require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") + + val parameterAttempt = request.getParameter("attempt") + require(parameterAttempt != null && parameterAttempt.nonEmpty, "Missing attempt parameter") + + val stage = (parameterId.toInt, parameterAttempt.toInt) + + val finishedStageToMem = memoryListener.completedStagesToMem + val content = if (finishedStageToMem.get(stage).isDefined) { + val executorIdToMem = finishedStageToMem(stage).toSeq.sortBy(_._1) + val execMemTable = new MemTableBase(executorIdToMem, memoryListener) +

Executors ({executorIdToMem.size})

++ + execMemTable.toNodeSeq + } else { + Seq.empty + } + UIUtils.headerSparkPage("Stage Detail Memory Usage", content, parent) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8296c4294242c..3073584f28b55 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -235,9 +235,15 @@ private[spark] object JsonProtocol { def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = { val execId = metricsUpdate.execId + val executorMetrics = metricsUpdate.executorMetrics val accumUpdates = metricsUpdate.accumUpdates + val metricsJson: JValue = executorMetrics match { + case Some(metrics) => executorMetricsToJson(metrics) + case None => "none" + } ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~ ("Executor ID" -> execId) ~ + ("Executor Metrics Updated" -> metricsJson) ~ ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) => ("Task ID" -> taskId) ~ ("Stage ID" -> stageId) ~ @@ -333,6 +339,19 @@ private[spark] object JsonProtocol { } } + def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = { + val transportMetrics = transportMetricsToJson(executorMetrics.transportMetrics) + ("Executor Hostname" -> executorMetrics.hostname) ~ + ("Executor Port" -> executorMetrics.port.map(new JInt(_)).getOrElse(JNothing)) ~ + ("TransportMetrics" -> transportMetrics) + } + + def transportMetricsToJson(transportMetrics: TransportMetrics): JValue = { + ("TimeStamp" -> transportMetrics.timeStamp) ~ + ("OnHeapSize" -> transportMetrics.onHeapSize) ~ + ("OffHeapSize" -> transportMetrics.offHeapSize) + } + def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { val shuffleReadMetrics: JValue = ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~ @@ -664,6 +683,7 @@ private[spark] object JsonProtocol { def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = { val execInfo = (json \ "Executor ID").extract[String] + val executorMetrics = executorMetricsFromJson(json \ "Executor Metrics Updated") val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { json => val taskId = (json \ "Task ID").extract[Long] val stageId = (json \ "Stage ID").extract[Int] @@ -672,7 +692,7 @@ private[spark] object JsonProtocol { (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson) (taskId, stageId, stageAttemptId, updates) } - SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates) + SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, Some(executorMetrics)) } /** --------------------------------------------------------------------- * @@ -777,6 +797,25 @@ private[spark] object JsonProtocol { } } + def executorMetricsFromJson(json: JValue): ExecutorMetrics = { + val metrics = new ExecutorMetrics + if (json == JNothing) { + return metrics + } + metrics.setHostname((json \ "Executor Hostname").extract[String]) + metrics.setPort(Utils.jsonOption(json \ "Executor Port").map(_.extract[Int])) + metrics.setTransportMetrics(transportMetricsFromJson(json \ "TransportMetrics")) + metrics + } + + def transportMetricsFromJson(json: JValue): TransportMetrics = { + val metrics = new TransportMetrics( + (json \ "TimeStamp").extract[Long], + (json \ "OnHeapSize").extract[Long], + (json \ "OffHeapSize").extract[Long]) + metrics + } + def taskMetricsFromJson(json: JValue): TaskMetrics = { val metrics = TaskMetrics.empty if (json == JNothing) { diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index 88916488c0def..f95d450c5642b 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -29,7 +29,7 @@ import org.mockito.Matchers._ import org.mockito.Mockito.{mock, spy, verify, when} import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} -import org.apache.spark.executor.TaskMetrics +import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -77,7 +77,7 @@ class HeartbeatReceiverSuite heartbeatReceiverClock = new ManualClock heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock) heartbeatReceiverRef = sc.env.rpcEnv.setupEndpoint("heartbeat", heartbeatReceiver) - when(scheduler.executorHeartbeatReceived(any(), any(), any())).thenReturn(true) + when(scheduler.executorHeartbeatReceived(any(), any(), any(), any())).thenReturn(true) } /** @@ -214,7 +214,7 @@ class HeartbeatReceiverSuite val metrics = TaskMetrics.empty val blockManagerId = BlockManagerId(executorId, "localhost", 12345) val response = heartbeatReceiverRef.askSync[HeartbeatResponse]( - Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId)) + Heartbeat(executorId, null, Array(1L -> metrics.accumulators()), blockManagerId)) if (executorShouldReregister) { assert(response.reregisterBlockManager) } else { @@ -222,6 +222,7 @@ class HeartbeatReceiverSuite // Additionally verify that the scheduler callback is called with the correct parameters verify(scheduler).executorHeartbeatReceived( Matchers.eq(executorId), + Matchers.eq(null), Matchers.eq(Array(1L -> metrics.accumulators())), Matchers.eq(blockManagerId)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a10941b579fe2..2e07699493b00 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -30,6 +30,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} @@ -116,6 +117,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def stop() = {} override def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def submitTasks(taskSet: TaskSet) = { @@ -562,6 +564,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou override def defaultParallelism(): Int = 2 override def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {} diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 4c3d0b102152c..84e9e37fde2b6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -136,6 +136,135 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit "a fine:mind$dollar{bills}.1", None, Some("lz4"))) } + test("test event logger logging executor metrics") { + import org.apache.spark.scheduler.cluster._ + import org.apache.spark.ui.memory._ + val conf = EventLoggingListenerSuite.getLoggingConf(testDirPath) + val eventLogger = new EventLoggingListener("test-memListener", None, testDirPath.toUri(), conf) + val execId = "exec-1" + val hostName = "host-1" + val port: Option[Int] = Some(80) + + eventLogger.start() + eventLogger.onExecutorAdded(SparkListenerExecutorAdded( + 0L, execId, new ExecutorInfo(hostName, 1, Map.empty))) + + // stage 1 and stage 2 submitted + eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1)) + eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(2)) + val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 1L, 20, 10) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics1)) + val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 2L, 30, 10) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics2)) + // stage1 completed + eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1)) + // stage3 submitted + eventLogger.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(3)) + val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 3L, 30, 30) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics3)) + val execMetrics4 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 4L, 20, 25) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics4)) + // stage 2 completed + eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(2)) + val execMetrics5 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 5L, 15, 15) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics5)) + val execMetrics6 = MemoryListenerSuite.createExecutorMetrics(hostName, port, 6L, 25, 10) + eventLogger.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId, execMetrics6)) + // stage 3 completed + eventLogger.onStageCompleted(MemoryListenerSuite.createStageEndEvent(3)) + + eventLogger.onExecutorRemoved(SparkListenerExecutorRemoved(7L, execId, "")) + + // Totally there are 15 logged events, including: + // 2 events of executor Added/Removed + // 6 events of stage Submitted/Completed + // 7 events of executorMetrics update (3 combined metrics and 4 original metrics) + assert(eventLogger.loggedEvents.size === 15) + eventLogger.stop() + + val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + val lines = readLines(logData) + Utils.tryWithSafeFinally { + // totally there are 16 lines, including SparkListenerLogStart event and 15 other events + assert(lines.size === 16) + + val listenerBus = new LiveListenerBus(sc) + val memoryListener = new MemoryListener + listenerBus.addListener(memoryListener) + + val sparkEvents: Seq[SparkListenerEvent] = lines.map { line => + val event = JsonProtocol.sparkEventFromJson(parse(line)) + listenerBus.postToAll(event) + event + } + + // Make sure there always an original {{SparkListenerExecutorMetricsUpdate}} event updated + // before each stage complete. + val latestMetricsStage1 = getLatestExecutorMetricsBeforeStageEnd(sparkEvents, 1).get + val latestMetricsStage2 = getLatestExecutorMetricsBeforeStageEnd(sparkEvents, 2).get + val latestMetricsStage3 = getLatestExecutorMetricsBeforeStageEnd(sparkEvents, 3).get + assertMetrics(execId, (hostName, 2L, 30, 10), latestMetricsStage1) + assertMetrics(execId, (hostName, 4L, 20, 25), latestMetricsStage2) + assertMetrics(execId, (hostName, 6L, 25, 10), latestMetricsStage3) + + // Following is an integration test with [[org.apache.spark.ui.memory.MemoryListener]], make + // sure the events logged in history file can work correctly. + val mapForStage1 = memoryListener.completedStagesToMem((1, 0)) + val transMetrics1 = mapForStage1(execId).transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 30, 10, MemTime(30, 2L), MemTime(10, 2L), transMetrics1) + val mapForStage2 = memoryListener.completedStagesToMem((2, 0)) + val transMetrics2 = mapForStage2(execId).transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 20, 25, MemTime(30, 3L), MemTime(30, 3L), transMetrics2) + val mapForStage3 = memoryListener.completedStagesToMem((3, 0)) + val transMetrics3 = mapForStage3(execId).transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 25, 10, MemTime(30, 3L), MemTime(30, 3L), transMetrics3) + } { + logData.close() + } + + def getLatestExecutorMetricsBeforeStageEnd( + events: Seq[SparkListenerEvent], + stageId: Int): Option[SparkListenerExecutorMetricsUpdate] = { + val itr = events.iterator + var latestMetrics: Option[SparkListenerExecutorMetricsUpdate] = None + var isStageSubmitted: Boolean = false + while(itr.hasNext) { + val event = itr.next() + event match { + case ss: SparkListenerStageSubmitted if ss.stageInfo.stageId == stageId => + isStageSubmitted = true + case sc: SparkListenerStageCompleted if sc.stageInfo.stageId == stageId => + return latestMetrics + case emu: SparkListenerExecutorMetricsUpdate if isStageSubmitted => + latestMetrics = Some(emu) + case _ => // Do nothing for other events + } + } + latestMetrics + } + + def assertMetrics( + execId: String, + metricsDetails: (String, Long, Long, Long), + event: SparkListenerExecutorMetricsUpdate): Unit = { + val execMetrics = event.executorMetrics + assert(execId === event.execId) + assert(metricsDetails._1 === execMetrics.hostname) + assert(metricsDetails._2 === execMetrics.transportMetrics.timeStamp) + assert(metricsDetails._3 === execMetrics.transportMetrics.onHeapSize) + assert(metricsDetails._4 === execMetrics.transportMetrics.offHeapSize) + } + } + /* ----------------- * * Actual test logic * * ----------------- */ @@ -170,7 +299,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // Verify file contains exactly the two events logged val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) - try { + Utils.tryWithSafeFinally { val lines = readLines(logData) val logStart = SparkListenerLogStart(SPARK_VERSION) assert(lines.size === 3) @@ -180,7 +309,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart) assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd) - } finally { + } { logData.close() } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala index ba56af8215cd7..201111274d204 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AccumulatorV2 @@ -87,6 +88,7 @@ private class DummyTaskScheduler extends TaskScheduler { override def applicationAttemptId(): Option[String] = None def executorHeartbeatReceived( execId: String, + executorMetrics: ExecutorMetrics, accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = true } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index a8b9604899838..b0daf13b682c1 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._ import org.apache.spark._ import org.apache.spark.broadcast.BroadcastManager -import org.apache.spark.executor.DataReadMethod +import org.apache.spark.executor.{DataReadMethod, ExecutorMetrics} import org.apache.spark.internal.config._ import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.{BlockDataManager, BlockTransferService} @@ -1269,6 +1269,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1))) } + override def updateMemMetrics(executorMetrics: ExecutorMetrics): Unit = {} + override def close(): Unit = {} override def hostName: String = { "MockBlockTransferServiceHost" } diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 93964a2d56743..d3625a02ae7cf 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -323,7 +323,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L))) listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, null, Array( (1234L, 0, 0, makeTaskMetrics(0).accumulators().map(AccumulatorSuite.makeInfo)), (1235L, 0, 0, makeTaskMetrics(100).accumulators().map(AccumulatorSuite.makeInfo)), (1236L, 1, 0, makeTaskMetrics(200).accumulators().map(AccumulatorSuite.makeInfo))))) diff --git a/core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala new file mode 100644 index 0000000000000..d6ed2d9493cc2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/ui/memory/MemoryListenerSuite.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.memory + +import org.apache.spark._ +import org.apache.spark.executor._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster._ + +class MemoryListenerSuite extends SparkFunSuite { + + test("test stages use executor metrics updated in previous stages") { + val listener = new MemoryListener + val execId1 = "exec-1" + val host1 = "host-1" + val port: Option[Int] = Some(80) + + listener.onExecutorAdded( + SparkListenerExecutorAdded(1L, execId1, new ExecutorInfo(host1, 1, Map.empty))) + + // stage 1, no metrics update + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1)) + + // multiple metrics updated in stage 2 + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(2)) + val execMetrics1 = MemoryListenerSuite.createExecutorMetrics(host1, port, 2L, 20, 10) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId1, execMetrics1)) + val execMetrics2 = MemoryListenerSuite.createExecutorMetrics(host1, port, 3L, 30, 5) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId1, execMetrics2)) + val execMetrics3 = MemoryListenerSuite.createExecutorMetrics(host1, port, 4L, 15, 15) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId1, execMetrics3)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(2)) + + // stage 3 and stage 4 don't get metrics + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(3)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(3)) + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(4)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(4)) + + // no metrics for stage 1 since no metrics updated for stage 1 + val mapForStage1 = listener.completedStagesToMem((1, 0)) + assert(mapForStage1.get(execId1).get.transportInfo === None) + + // metrics is with aggregated value for stage 2 when there are more than one metrics updated + val mapForStage2 = listener.completedStagesToMem((2, 0)) + val transMetrics2 = mapForStage2(execId1).transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 15, 15, MemTime(30, 3), MemTime(15, 4), transMetrics2) + + // both stage 3 and stage 4 will use the metrics last updated in stage 2 + val mapForStage3 = listener.completedStagesToMem((3, 0)) + val memInfo3 = mapForStage3(execId1) + assert(memInfo3.transportInfo.isDefined) + val transMetrics3 = memInfo3.transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 15, 15, MemTime(15, 4), MemTime(15, 4), transMetrics3) + + val mapForStage4 = listener.completedStagesToMem((4, 0)) + val memInfo4 = mapForStage4(execId1) + assert(memInfo4.transportInfo.isDefined) + val transMetrics4 = memInfo4.transportInfo.get + MemoryListenerSuite.assertTransMetrics( + 15, 15, MemTime(15, 4), MemTime(15, 4), transMetrics4) + + listener.onExecutorRemoved(SparkListenerExecutorRemoved(0L, execId1, "")) + } + + test("test multiple executors with multiple stages") { + val listener = new MemoryListener + val (execId1, execId2, execId3) = ("exec-1", "exec-2", "exec-3") + val (host1, host2, host3) = ("host-1", "host-2", "host-3") + val (port1, port2, port3): (Option[Int], Option[Int], Option[Int]) = + (Some(80), Some(80), Some(80)) + + // two executors added first + listener.onExecutorAdded( + SparkListenerExecutorAdded(1L, execId1, new ExecutorInfo(host1, 1, Map.empty))) + listener.onExecutorAdded( + SparkListenerExecutorAdded(2L, execId2, new ExecutorInfo(host2, 1, Map.empty))) + + // three executors running in one stage and one executor is removed before stage complete + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(1)) + val exec1Metrics = MemoryListenerSuite.createExecutorMetrics(host1, port1, 3L, 20, 10) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId1, exec1Metrics)) + val exec2Metrics = MemoryListenerSuite.createExecutorMetrics(host2, port2, 4L, 15, 5) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId2, exec2Metrics)) + // one more executor added during the stage is running + listener.onExecutorAdded( + SparkListenerExecutorAdded(0L, execId3, new ExecutorInfo(host3, 1, Map.empty))) + val exec3Metrics = MemoryListenerSuite.createExecutorMetrics(host3, port3, 5L, 30, 15) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId3, exec3Metrics)) + + assert(listener.activeExecutorIdToMem.size === 3) + assert(listener.removedExecutorIdToMem.isEmpty) + + // executor 2 removed before stage complete + listener.onExecutorRemoved(SparkListenerExecutorRemoved(6L, execId2, "")) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(1)) + (2 to 3).foreach { i => + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(i)) + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(i)) + } + + // stages are all completed, no activeStages now + assert(listener.activeStagesToMem.isEmpty) + + listener.onStageSubmitted(MemoryListenerSuite.createStageStartEvent(4)) + listener.onExecutorMetricsUpdate(MemoryListenerSuite.createExecutorMetricsUpdateEvent( + execId2, new ExecutorMetrics)) + + assert(listener.activeExecutorIdToMem.size === 3) + assert(listener.activeStagesToMem.size === 1) + + listener.onStageCompleted(MemoryListenerSuite.createStageEndEvent(4)) + + assert(listener.activeStagesToMem.isEmpty) + assert(listener.completedStagesToMem.size === 4) + assert(listener.removedExecutorIdToMem.size === 1) + + listener.onExecutorRemoved(SparkListenerExecutorRemoved(7L, execId1, "")) + listener.onExecutorRemoved(SparkListenerExecutorRemoved(8L, execId3, "")) + + assert(listener.removedExecutorIdToMem.size === 3) + + // the {{completedStagesToMem}} will maintain the metrics of both the removed executors and + // new added executors + val mapForStage1 = listener.completedStagesToMem((1, 0)) + assert(mapForStage1.size === 3) + + val transMetrics1 = mapForStage1(execId1).transportInfo.get + val transMetrics2 = mapForStage1(execId2).transportInfo.get + val transMetrics3 = mapForStage1(execId3).transportInfo.get + + MemoryListenerSuite.assertTransMetrics( + 20, 10, MemTime(20, 3), MemTime(10, 3), transMetrics1) + MemoryListenerSuite.assertTransMetrics( + 15, 5, MemTime(15, 4), MemTime(5, 4), transMetrics2) + MemoryListenerSuite.assertTransMetrics( + 30, 15, MemTime(30, 5), MemTime(15, 5), transMetrics3) + } +} + +object MemoryListenerSuite extends SparkFunSuite { + def createStageStartEvent(stageId: Int): SparkListenerStageSubmitted = { + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, Seq.empty, Seq.empty, "") + SparkListenerStageSubmitted(stageInfo) + } + + def createStageEndEvent(stageId: Int): SparkListenerStageCompleted = { + val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, Seq.empty, Seq.empty, "") + SparkListenerStageCompleted(stageInfo) + } + + def createExecutorMetricsUpdateEvent( + execId: String, + executorMetrics: ExecutorMetrics): SparkListenerExecutorMetricsUpdate = { + SparkListenerExecutorMetricsUpdate(execId, executorMetrics, Seq.empty) + } + + def createExecutorMetrics( + hostname: String, + port: Option[Int], + timeStamp: Long, + onHeapSize: Long, + offHeapSize: Long): ExecutorMetrics = { + ExecutorMetrics(hostname, port, TransportMetrics(timeStamp, onHeapSize, offHeapSize)) + } + + def assertTransMetrics( + onHeapSize: Long, + offHeapSize: Long, + peakOnHeapSizeTime: MemTime, + peakOffHeapSizTime: MemTime, + transMemSize: TransportMemSize): Unit = { + assert(onHeapSize === transMemSize.onHeapSize) + assert(offHeapSize === transMemSize.offHeapSize) + assert(peakOnHeapSizeTime === transMemSize.peakOnHeapSizeTime) + assert(peakOffHeapSizTime === transMemSize.peakOffHeapSizeTime) + } +} diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a64dbeae47294..44c872af2cc85 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -82,6 +82,13 @@ class JsonProtocolSuite extends SparkFunSuite { val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") + val executorMetrics = { + val execMetrics = new ExecutorMetrics + execMetrics.setHostname("host-1") + execMetrics.setPort(Some(80)) + execMetrics.setTransportMetrics(TransportMetrics(0L, 10, 10)) + execMetrics + } val executorBlacklisted = SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22) val executorUnblacklisted = SparkListenerExecutorUnblacklisted(executorUnblacklistedTime, "exec1") @@ -94,7 +101,7 @@ class JsonProtocolSuite extends SparkFunSuite { makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true) .accumulators().map(AccumulatorSuite.makeInfo) .zipWithIndex.map { case (a, i) => a.copy(id = i) } - SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates))) + SparkListenerExecutorMetricsUpdate("exec3", executorMetrics, Seq((1L, 2, 3, accumUpdates))) } testEvent(stageSubmitted, stageSubmittedJsonString) @@ -432,6 +439,25 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), 123, JString("123")) } + test("ExecutorMetrics backward compatibility") { + // ExecutorMetrics is newly added + val accumUpdates = + makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true) + .accumulators().map(AccumulatorSuite.makeInfo) + .zipWithIndex.map { case (a, i) => a.copy(id = i) } + val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", new ExecutorMetrics, + Seq((1L, 2, 3, accumUpdates))) + assert(executorMetricsUpdate.executorMetrics != null) + assert(executorMetricsUpdate.executorMetrics.transportMetrics != null) + val newJson = JsonProtocol.executorMetricsUpdateToJson(executorMetricsUpdate) + val oldJson = newJson.removeField { case (field, _) => field == "Executor Metrics Updated"} + val newMetrics = JsonProtocol.executorMetricsUpdateFromJson(oldJson) + assert(newMetrics.executorMetrics.hostname === "") + assert(newMetrics.executorMetrics.port === None) + assert(newMetrics.executorMetrics.transportMetrics.onHeapSize === 0L) + assert(newMetrics.executorMetrics.transportMetrics.offHeapSize === 0L) + assert(newMetrics.executorMetrics.transportMetrics.timeStamp != 0L) + } } @@ -1794,6 +1820,15 @@ private[spark] object JsonProtocolSuite extends Assertions { |{ | "Event": "SparkListenerExecutorMetricsUpdate", | "Executor ID": "exec3", + | "Executor Metrics Updated": { + | "Executor Hostname": "host-1", + | "Executor Port": 80, + | "TransportMetrics": { + | "TimeStamp": 0, + | "OnHeapSize": 10, + | "OffHeapSize": 10 + | } + | }, | "Metrics Updated": [ | { | "Task ID": 1, diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index feae76a087dec..ed6f89e93bc96 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -108,7 +108,14 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerBlockManagerAdded.apply"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.StorageStatus.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.storage.StorageStatus.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.RDDDataDistribution.this"), + + // [SPARK-9103] Update SparkListenerExecutorMetricsUpdate with new executorMetrics + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"), + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy$default$2") ) // Exclude rules for 2.1.x diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index e6cd41e4facf1..ef7e0c444911c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -139,7 +139,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest assert(listener.getExecutionMetrics(0).isEmpty) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) @@ -152,7 +152,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), (1L, 0, 0, @@ -164,7 +164,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest // Retrying a stage should reset the metrics listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)) @@ -204,7 +204,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest // Summit a new stage listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) - listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", null, Seq( // (task id, stage id, stage attempt, accum updates) (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)), (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))