Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.TaskScheduler
import org.apache.spark.util.ActorLogReceive
import org.apache.spark.scheduler.ExecutorLossReason

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand All @@ -32,18 +33,56 @@ private[spark] case class Heartbeat(
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
blockManagerId: BlockManagerId)

private[spark] case object ExpireDeadHosts

private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)

/**
* Lives in the driver to receive heartbeats from executors..
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a header comment that communicates that one of the functions of the HeartbeatReceiver is to expire executors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Sryza, thanks for your review.

*/
private[spark] class HeartbeatReceiver(scheduler: TaskScheduler)
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend limit the number of things we pass to this receiver to the following set of smaller things instead of the whole SparkContext.

private[spark] class HeartbeatReceiver(
    conf: SparkConf,
    scheduler: TaskScheduler,
    executorAllocationClient: Option[ExecutorAllocationClient]) {
  ...
}

The SparkContext is an instance of ExecutorAllocationClient, so if dynamic allocation is enabled you just pass in Some(this) in SparkContext, otherwise None to indicate that killing is not supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not easy to understand, on the other hand the SparkContext is use in a lot of place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but it's bad design pattern to pass in the whole SparkContext everywhere since it has such a wide interface that we don't actually need. I would argue that it's actually easier to understand because from the signature alone we know exactly what is needed by HeartbeatReceiver.

Anyway, not a big deal since this is private. I will merge it as is.

extends Actor with ActorLogReceive with Logging {

val executorLastSeen = new mutable.HashMap[String, Long]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment on what the keys and values are:

// executor ID -> timestamp of when the last heartbeat from this executor was received


import context.dispatcher
var timeoutCheckingTask = context.system.scheduler.schedule(0.seconds,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

too many spaces after var

10.milliseconds, self, ExpireDeadHosts)

val slaveTimeout = sc.conf.getLong("spark.storage.blockManagerSlaveTimeoutMs",
math.max(sc.conf.getInt("spark.executor.heartbeatInterval", 10000) * 3, 120000))

override def receiveWithLogging = {
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
val response = HeartbeatResponse(
!scheduler.executorHeartbeatReceived(executorId, taskMetrics, blockManagerId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not your code, but can you put this in a separate val:

val unknownExecutor = !scheduler.executorHeartbeatReceived(...)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
...

heartbeatReceived(executorId)
sender ! response
case ExpireDeadHosts =>
expireDeadHosts()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor nit: remove space

}

private def heartbeatReceived(executorId: String) = {
executorLastSeen(executorId) = System.currentTimeMillis()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can just be moved into the one place that it's called from

}

private def expireDeadHosts() {
logTrace("Checking for hosts with no recent heart beats in HeartbeatReceiver.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove space between "heart" and "beats"

val now = System.currentTimeMillis()
val minSeenTime = now - slaveTimeout
for ((executorId, lastSeenMs) <- executorLastSeen) {
if (lastSeenMs < minSeenTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's easier to read if it's the following instead:

if (now - lastSeenMs > executorTimeout) {
  ... // remove it
}

val msg = "Removing Executor " + executorId + " with no recent heart beats: "
+(now - lastSeenMs) + "ms exceeds " + slaveTimeout + "ms"
logWarning(msg)
if (scheduler.isInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to add a method to TaskScheduler for this.

scheduler.asInstanceOf[org.apache.spark.scheduler.TaskSchedulerImpl]
.executorLost(executorId, new ExecutorLossReason(""))
}
sc.killExecutor(executorId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't do this since it's only available in YARN at the moment

executorLastSeen.remove(executorId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a heartbeat from the executor gets delivered after we kill / remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the akka connection is still alive, we can kill executor by send kill message to applicationMaster.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this code will correctly expire that executor as a dead host after a timeout.

}
}
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] var (schedulerBackend, taskScheduler) =
SparkContext.createTaskScheduler(this, master)
private val heartbeatReceiver = env.actorSystem.actorOf(
Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
@volatile private[spark] var dagScheduler: DAGScheduler = _
try {
dagScheduler = new DAGScheduler(this)
Expand Down