Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package org.apache.spark.executor

import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit

import scala.concurrent.Await

import akka.actor._
import akka.remote._
import akka.pattern.Patterns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: alphabetize imports

import akka.util.Timeout

import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
import org.apache.spark.TaskState.TaskState
Expand Down Expand Up @@ -101,26 +106,33 @@ private[spark] object CoarseGrainedExecutorBackend {
workerUrl: Option[String]) {

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)

val conf = new SparkConf
// Create a new ActorSystem to run the backend, because we can't create a
// SparkEnv / Executor before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
conf, new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach {
url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
}
actorSystem.awaitTermination()

// Debug code
Utils.checkHost(hostname)

// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val (fetcher, _) = AkkaUtils.createActorSystem(
"driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
val driver = fetcher.actorSelection(driverUrl)
val timeout = new Timeout(5, TimeUnit.MINUTES)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any particular reason you chose 5 minutes over the AkkaUtils.askTimeout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I didn't see AkkaUtils.askTimeout.

val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
val props = Await.result(fut, timeout.duration).asInstanceOf[Seq[(String, String)]]
fetcher.shutdown()

// Create a new ActorSystem to run the backend, because we can't create a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is no longer technically true -- I think we do have all the information to start a proper SparkEnv, whose actor system we could use instead. Also, RegisteredExecutor() no longer needs t otake these properties.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It requires changing Executor's constructor and hence other backends like mesos. This is not a new issue introduced in this PR. I removed props from RegisteredExecutor message.

// SparkEnv / Executor before getting started with all our system properties, etc
val driverConf = new SparkConf().setAll(props)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
workerUrl.foreach { url =>
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
}
actorSystem.awaitTermination()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private[spark] class Executor(
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
val serializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
if (serializedDirectResult.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
logInfo("Storing result for " + taskId + " in local BlockManager")
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@ import java.nio.ByteBuffer
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.util.{SerializableBuffer, Utils}
import org.apache.spark.SparkConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: import ordering (this should come first)


private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable

private[spark] object CoarseGrainedClusterMessages {

case object RetrieveSparkProps extends CoarseGrainedClusterMessage

// Driver to executors
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
addressToExecutorId.get(address).foreach(removeExecutor(_,
"remote Akka client disassociated"))

case RetrieveSparkProps =>
sender ! sparkProperties
}

// Make fake resource offers on all executors
Expand All @@ -143,14 +145,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - 1024) {
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
var msg = "Serialized task %s:%d was %d bytes which " +
"exceeds spark.akka.frameSize (%d bytes). " +
"Consider using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSet.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,7 @@ private[spark] object AkkaUtils extends Logging {
def maxFrameSizeBytes(conf: SparkConf): Int = {
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
}

/** Space reserved for extra data in an Akka message besides serialized task or task result. */
val reservedSizeBytes = 200 * 1024
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext
val thrown = intercept[SparkException] {
larger.collect()
}
assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
assert(thrown.getMessage.contains("using broadcast variables for large values"))
val smaller = sc.parallelize(1 to 4).collect()
assert(smaller.size === 4)
}
Expand Down