Skip to content

Commit 92db56d

Browse files
zsxwingzoelin7
authored andcommitted
[SPARK-25568][CORE] Continue to update the remaining accumulators when failing to update one accumulator
Ref: LIHADOOP-42001 Since we don't fail a job when `AccumulatorV2.merge` fails, we should try to update the remaining accumulators so that they can still report correct values. The new unit test. Closes apache#22586 from zsxwing/SPARK-25568. Authored-by: Shixiong Zhu <zsxwing@gmail.com> Signed-off-by: gatorsmile <gatorsmile@gmail.com> (cherry picked from commit b6b8a66) RB=1489248 BUG=LIHADOOP-42001 G=superfriends-reviewers R=fli,mshen,yezhou,edlu A=yezhou
1 parent ee97ac7 commit 92db56d

3 files changed

Lines changed: 38 additions & 6 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,9 +1127,10 @@ class DAGScheduler(
11271127
private def updateAccumulators(event: CompletionEvent): Unit = {
11281128
val task = event.task
11291129
val stage = stageIdToStage(task.stageId)
1130-
try {
1131-
event.accumUpdates.foreach { updates =>
1132-
val id = updates.id
1130+
1131+
event.accumUpdates.foreach { updates =>
1132+
val id = updates.id
1133+
try {
11331134
// Find the corresponding accumulator on the driver and update it
11341135
val acc: AccumulatorV2[Any, Any] = AccumulatorContext.get(id) match {
11351136
case Some(accum) => accum.asInstanceOf[AccumulatorV2[Any, Any]]
@@ -1143,10 +1144,17 @@ class DAGScheduler(
11431144
event.taskInfo.setAccumulables(
11441145
acc.toInfo(Some(updates.value), Some(acc.value)) +: event.taskInfo.accumulables)
11451146
}
1147+
} catch {
1148+
case NonFatal(e) =>
1149+
// Log the class name to make it easy to find the bad implementation
1150+
val accumClassName = AccumulatorContext.get(id) match {
1151+
case Some(accum) => accum.getClass.getName
1152+
case None => "Unknown class"
1153+
}
1154+
logError(
1155+
s"Failed to update accumulator $id ($accumClassName) for task ${task.partitionId}",
1156+
e)
11461157
}
1147-
} catch {
1148-
case NonFatal(e) =>
1149-
logError(s"Failed to update accumulators for task ${task.partitionId}", e)
11501158
}
11511159
}
11521160

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,6 +1769,26 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
17691769
assert(sc.parallelize(1 to 10, 2).count() === 10)
17701770
}
17711771

1772+
test("misbehaved accumulator should not impact other accumulators") {
1773+
val bad = new LongAccumulator {
1774+
override def merge(other: AccumulatorV2[java.lang.Long, java.lang.Long]): Unit = {
1775+
throw new DAGSchedulerSuiteDummyException
1776+
}
1777+
}
1778+
sc.register(bad, "bad")
1779+
val good = sc.longAccumulator("good")
1780+
1781+
sc.parallelize(1 to 10, 2).foreach { item =>
1782+
bad.add(1)
1783+
good.add(1)
1784+
}
1785+
1786+
// This is to ensure the `bad` accumulator did fail to update its value
1787+
assert(bad.value == 0L)
1788+
// Should be able to update the "good" accumulator
1789+
assert(good.value == 10L)
1790+
}
1791+
17721792
/**
17731793
* The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
17741794
* Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.

docs/rdd-programming-guide.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1465,6 +1465,10 @@ jsc.sc().register(myVectorAcc, "MyVectorAcc1");
14651465

14661466
Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.
14671467

1468+
*Warning*: When a Spark task finishes, Spark will try to merge the accumulated updates in this task to an accumulator.
1469+
If it fails, Spark will ignore the failure and still mark the task successful and continue to run other tasks. Hence,
1470+
a buggy accumulator will not impact a Spark job, but it may not get updated correctly although a Spark job is successful.
1471+
14681472
</div>
14691473

14701474
<div data-lang="python" markdown="1">

0 commit comments

Comments
 (0)