Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ private static class ClientPool {
private EventLoopGroup workerGroup;
private PooledByteBufAllocator pooledAllocator;

public PooledByteBufAllocator getPooledAllocator() { return pooledAllocator; }

public TransportClientFactory(
TransportContext context,
List<TransportClientBootstrap> clientBootstraps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class TransportServer implements Closeable {

private ServerBootstrap bootstrap;
private ChannelFuture channelFuture;
private PooledByteBufAllocator allocator;
private int port = -1;

/**
Expand All @@ -78,6 +79,8 @@ public TransportServer(
}
}

public PooledByteBufAllocator getAllocator() { return allocator; }

public int getPort() {
if (port == -1) {
throw new IllegalStateException("Server not initialized");
Expand All @@ -92,7 +95,7 @@ private void init(String hostToBind, int portToBind) {
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;

PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

bootstrap = new ServerBootstrap()
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.mutable
import scala.concurrent.Future

import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.scheduler._
Expand All @@ -36,6 +37,7 @@ import org.apache.spark.util._
*/
private[spark] case class Heartbeat(
executorId: String,
executorMetrics: ExecutorMetrics,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
blockManagerId: BlockManagerId)

Expand Down Expand Up @@ -120,14 +122,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
case heartbeat @ Heartbeat(executorId, executorMetrics, accumUpdates, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, accumUpdates, blockManagerId)
executorId, executorMetrics, accumUpdates, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
Expand Down Expand Up @@ -63,6 +64,7 @@ class SparkEnv (
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
val blockTransferService: BlockTransferService,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
Expand Down Expand Up @@ -383,6 +385,7 @@ object SparkEnv extends Logging {
mapOutputTracker,
shuffleManager,
broadcastManager,
blockTransferService,
blockManager,
securityManager,
metricsSystem,
Expand Down
22 changes: 21 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,12 @@ private[spark] class Executor(
env.blockManager.initialize(conf.getAppId)
}

private val executorMetrics: ExecutorMetrics = new ExecutorMetrics
executorMetrics.setHostname(Utils.localHostName)
if (env.rpcEnv.address != null) {
executorMetrics.setPort(Some(env.rpcEnv.address.port))
}

// Whether to load classes in user jars before those in Spark jars
private val userClassPathFirst = conf.getBoolean("spark.executor.userClassPathFirst", false)

Expand Down Expand Up @@ -704,7 +710,21 @@ private[spark] class Executor(
}
}

val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
env.blockTransferService.updateMemMetrics(this.executorMetrics)
val executorMetrics = if (isLocal) {
// When running locally, there is a chance that the executorMetrics could change
// out from under us. So, copy them here. In non-local mode this object would be
// serialized and de-serialized on its way to the driver. Perform that operation here
// to obtain the same result as non-local mode.
// TODO - Add a test that fails in local mode if we don't copy executorMetrics here.
Utils.deserialize[ExecutorMetrics](Utils.serialize(this.executorMetrics))
} else {
this.executorMetrics
}

val message = Heartbeat(
executorId, executorMetrics, accumUpdates.toArray, env.blockManager.blockManagerId)

try {
val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
*/
@DeveloperApi
class ExecutorMetrics extends Serializable {

private var _hostname: String = ""
def hostname: String = _hostname
private[spark] def setHostname(value: String) = _hostname = value

private var _port: Option[Int] = None
def port: Option[Int] = _port
private[spark] def setPort(value: Option[Int]) = _port = value

private[spark] def hostPort: String = {
port match {
case None => hostname
case value => hostname + ":" + value.get
}
}

private var _transportMetrics: TransportMetrics =
new TransportMetrics(System.currentTimeMillis(), 0L, 0L)
def transportMetrics: TransportMetrics = _transportMetrics
private[spark] def setTransportMetrics(value: TransportMetrics) = {
_transportMetrics = value
}
}

object ExecutorMetrics extends Serializable {
def apply(
hostName: String,
port: Option[Int],
transportMetrics: TransportMetrics): ExecutorMetrics = {
val execMetrics = new ExecutorMetrics
execMetrics.setHostname(hostName)
execMetrics.setPort(port)
execMetrics.setTransportMetrics(transportMetrics)
execMetrics
}
}

/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class TransportMetrics (
val timeStamp: Long,
val onHeapSize: Long,
val offHeapSize: Long) extends Serializable

165 changes: 165 additions & 0 deletions core/src/main/scala/org/apache/spark/memory/MemoryListener.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.memory

import scala.collection.mutable.HashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.{ExecutorMetrics, TransportMetrics}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo

/**
* :: DeveloperApi ::
* A SparkListener that prepares information to be displayed on the MemoryTab.
*/
@DeveloperApi
class MemoryListener extends SparkListener {
type ExecutorId = String
val activeExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
// TODO This may use too much memory.
// There may be many removed executors (e.g. in Dynamic Allocation Mode).
val removedExecutorIdToMem = new HashMap[ExecutorId, MemoryUIInfo]
// A map that maintains the latest metrics of each active executor.
val latestExecIdToExecMetrics = new HashMap[ExecutorId, ExecutorMetrics]
// A map that maintains all executors memory information of each stage.
// [(stageId, attemptId), Seq[(executorId, MemoryUIInfo)]
val activeStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]]
// TODO Get conf of the retained stages so that we don't need to handle them all.
// There may be many completed stages.
val completedStagesToMem = new HashMap[(Int, Int), HashMap[ExecutorId, MemoryUIInfo]]

override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
val executorId = event.execId
val executorMetrics = event.executorMetrics
activeExecutorIdToMem
.getOrElseUpdate(executorId, new MemoryUIInfo)
.updateMemUiInfo(executorMetrics.get)
activeStagesToMem.foreach { case (_, stageMemMetrics) =>
// If executor is added in the stage running time, we also update the metrics for the
// executor in {{activeStagesToMem}}
if (!stageMemMetrics.contains(executorId)) {
stageMemMetrics(executorId) = new MemoryUIInfo
}
stageMemMetrics(executorId).updateMemUiInfo(executorMetrics.get)
}
latestExecIdToExecMetrics(executorId) = executorMetrics.get
}

override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
val executorId = event.executorId
activeExecutorIdToMem.put(executorId, new MemoryUIInfo(event.executorInfo))
}

override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
val executorId = event.executorId
val info = activeExecutorIdToMem.remove(executorId)
latestExecIdToExecMetrics.remove(executorId)
removedExecutorIdToMem.getOrElseUpdate(executorId, info.getOrElse(new MemoryUIInfo))
}

override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
val memInfoMap = new HashMap[ExecutorId, MemoryUIInfo]
activeExecutorIdToMem.map { case (id, _) =>
memInfoMap(id) = new MemoryUIInfo
val latestExecMetrics = latestExecIdToExecMetrics.get(id)
latestExecMetrics match {
case None => // Do nothing
case Some(metrics) =>
memInfoMap(id).updateMemUiInfo(metrics)
}
}
activeStagesToMem(stage) = memInfoMap
}

override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
val stage = (event.stageInfo.stageId, event.stageInfo.attemptId)
// We need to refresh {{activeStagesToMem}} with {{activeExecutorIdToMem}} in case the
// executor is added in the stage running time and no {{SparkListenerExecutorMetricsUpdate}}
// event is updated in this stage.
activeStagesToMem.get(stage).map { memInfoMap =>
activeExecutorIdToMem.foreach { case (executorId, memUiInfo) =>
if (!memInfoMap.contains(executorId)) {
memInfoMap(executorId) = new MemoryUIInfo
memInfoMap(executorId).copyMemUiInfo(memUiInfo)
}
}
}
completedStagesToMem.put(stage, activeStagesToMem.remove(stage).get)
}
}

@DeveloperApi
class MemoryUIInfo {
var executorAddress: String = _
var transportInfo: Option[TransportMemSize] = None

def this(execInfo: ExecutorInfo) = {
this()
executorAddress = execInfo.executorHost
}

def updateMemUiInfo(execMetrics: ExecutorMetrics): Unit = {
transportInfo = transportInfo match {
case Some(transportMemSize) => transportInfo
case _ => Some(new TransportMemSize)
}
executorAddress = execMetrics.hostPort
transportInfo.get.updateTransMemSize(execMetrics.transportMetrics)
}

def copyMemUiInfo(memUiInfo: MemoryUIInfo): Unit = {
executorAddress = memUiInfo.executorAddress
transportInfo.foreach(_.copyTransMemSize(memUiInfo.transportInfo.get))
}
}

@DeveloperApi
class TransportMemSize {
var onHeapSize: Long = _
var offHeapSize: Long = _
var peakOnHeapSizeTime: MemTime = new MemTime()
var peakOffHeapSizeTime: MemTime = new MemTime()

def updateTransMemSize(transportMetrics: TransportMetrics): Unit = {
val updatedOnHeapSize = transportMetrics.onHeapSize
val updatedOffHeapSize = transportMetrics.offHeapSize
val updateTime: Long = transportMetrics.timeStamp
onHeapSize = updatedOnHeapSize
offHeapSize = updatedOffHeapSize
if (updatedOnHeapSize >= peakOnHeapSizeTime.memorySize) {
peakOnHeapSizeTime = MemTime(updatedOnHeapSize, updateTime)
}
if (updatedOffHeapSize >= peakOffHeapSizeTime.memorySize) {
peakOffHeapSizeTime = MemTime(updatedOffHeapSize, updateTime)
}
}

def copyTransMemSize(transMemSize: TransportMemSize): Unit = {
onHeapSize = transMemSize.onHeapSize
offHeapSize = transMemSize.offHeapSize
peakOnHeapSizeTime = MemTime(transMemSize.peakOnHeapSizeTime.memorySize,
transMemSize.peakOnHeapSizeTime.timeStamp)
peakOffHeapSizeTime = MemTime(transMemSize.peakOffHeapSizeTime.memorySize,
transMemSize.peakOffHeapSizeTime.timeStamp)
}
}

@DeveloperApi
case class MemTime(memorySize: Long = 0L, timeStamp: Long = System.currentTimeMillis)
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.concurrent.{Future, Promise}
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag

import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient}
Expand All @@ -39,6 +40,11 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
*/
def init(blockDataManager: BlockDataManager): Unit

/**
* Collect current executor memory metrics of transferService.
*/
private[spark] def updateMemMetrics(executorMetrics: ExecutorMetrics): Unit

/**
* Tear down the transfer service.
*/
Expand Down
Loading