Skip to content

Commit 5757993

Browse files
committed
Emit warning when task size > 100KB
1 parent 0e2109d commit 5757993

4 files changed

Lines changed: 19 additions & 0 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ class DAGScheduler(
110110
// resubmit failed stages
111111
val POLL_TIMEOUT = 10L
112112

113+
// Warns the user if a stage contains a task with size greater than this value (in KB)
114+
val TASK_SIZE_TO_WARN = 100
115+
113116
private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
114117
override def preStart() {
115118
context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
@@ -430,6 +433,18 @@ class DAGScheduler(
430433
handleExecutorLost(execId)
431434

432435
case BeginEvent(task, taskInfo) =>
436+
for (
437+
job <- idToActiveJob.get(task.stageId);
438+
stage <- stageIdToStage.get(task.stageId);
439+
stageInfo <- stageToInfos.get(stage)
440+
) {
441+
if (taskInfo.serializedSize > TASK_SIZE_TO_WARN * 1024 && !stageInfo.emittedTaskSizeWarning) {
442+
stageInfo.emittedTaskSizeWarning = true
443+
logWarning(("Stage %d (%s) contains a task of very large " +
444+
"size (%d KB). The maximum recommended task size is %d KB.").format(
445+
task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN))
446+
}
447+
}
433448
listenerBus.post(SparkListenerTaskStart(task, taskInfo))
434449

435450
case GettingResultEvent(task, taskInfo) =>

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,4 +33,5 @@ class StageInfo(
3333
val name = stage.name
3434
val numPartitions = stage.numPartitions
3535
val numTasks = stage.numTasks
36+
var emittedTaskSizeWarning = false
3637
}

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ class TaskInfo(
4646

4747
var failed = false
4848

49+
var serializedSize: Int = 0
50+
4951
def markGettingResult(time: Long = System.currentTimeMillis) {
5052
gettingResultTime = time
5153
}

core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ private[spark] class ClusterTaskSetManager(
377377
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
378378
taskSet.id, index, serializedTask.limit, timeTaken))
379379
val taskName = "task %s:%d".format(taskSet.id, index)
380+
info.serializedSize = serializedTask.limit
380381
if (taskAttempts(index).size == 1)
381382
taskStarted(task,info)
382383
return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))

0 commit comments

Comments
 (0)