Skip to content

Commit bdaff12

Browse files
author
Sital Kedia
committed
Fix tests
1 parent 1e6e88a commit bdaff12

File tree

2 files changed

+7
-10
lines changed

2 files changed

+7
-10
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -426,16 +426,14 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf,
426426

427427
/** Get the epoch for map output for a shuffle, if it is available */
428428
def getEpochForMapOutput(shuffleId: Int, mapId: Int): Option[Long] = {
429-
val arrayOpt = mapStatuses.get(shuffleId)
430-
if (arrayOpt.isDefined && arrayOpt.get != null) {
431-
val array = arrayOpt.get
432-
array.synchronized {
433-
if (array(mapId) != null) {
434-
return Some(epochForMapStatus(shuffleId)(mapId))
435-
}
436-
}
429+
if (mapId < 0) {
430+
return None
437431
}
438-
None
432+
for {
433+
mapStatus <- mapStatuses.get(shuffleId).flatMap { mapStatusArray =>
434+
Option(mapStatusArray(mapId))
435+
}
436+
} yield epochForMapStatus(shuffleId)(mapId)
439437
}
440438

441439
/**

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1295,7 +1295,6 @@ class DAGScheduler(
12951295
val failedStage = stageIdToStage(task.stageId)
12961296
val mapStage = shuffleIdToMapStage(shuffleId)
12971297

1298-
val epochForMapOutput = mapOutputTracker.getEpochForMapOutput(shuffleId, mapId)
12991298
// It is possible that the map output was regenerated by rerun of the stage and the
13001299
// fetch failure is being reported for stale map output. In that case, we should just
13011300
// ignore the fetch failure and relaunch the task with latest map output info.

0 commit comments

Comments
 (0)