Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ private[spark] class TaskSchedulerImpl(
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentionally left as an undocumented parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - we could document this in docs/configuration.md -- but I don't think this is a commonly used flag, and it has been around for a while I guess.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess documenting it is better -- added a commit for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see that it wasn't part of your change. Anyways it might make sense to document it.


// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
val activeTaskSets = new HashMap[String, TaskSetManager]
Expand Down Expand Up @@ -228,16 +231,18 @@ private[spark] class TaskSchedulerImpl(
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= taskSet.CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
}
}
}
} while (launchedTask)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ private[spark] class TaskSetManager(
{
val conf = sched.sc.conf

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

/*
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
* does not realize right away leading to repeated task failures. If enabled,
Expand Down Expand Up @@ -388,7 +385,7 @@ private[spark] class TaskSetManager(
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie && availableCpus >= CPUS_PER_TASK) {
if (!isZombie) {
val curTime = clock.getTime()

var allowedLocality = getAllowedLocalityLevel(curTime)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
freeCores(executorId) += 1
freeCores(executorId) += scheduler.CPUS_PER_TASK
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
Expand Down Expand Up @@ -140,7 +140,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private[spark] class MesosSchedulerBackend(
val cpuResource = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(1).build())
.setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
.build()
MesosTaskInfo.newBuilder()
.setTaskId(taskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,37 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
assert(count > 0)
assert(count < numTrials)
}

test("Scheduler correctly accounts for multiple CPUs per task") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskCpus = 2

sc.conf.set("spark.task.cpus", taskCpus.toString)
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}

val numFreeCores = 1
val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
new WorkerOffer("executor1", "host1", numFreeCores))

// No tasks should run as we only have 1 core free.
val taskSet = FakeTask.createTaskSet(1)
taskScheduler.submitTasks(taskSet)
var taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
assert(0 === taskDescriptions.length)

// Now change the offers to have 2 cores in one executor and verify if it
// is chosen.
val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(1 === taskDescriptions.length)
assert("executor0" === taskDescriptions(0).executorId)
}
}