Skip to content

Commit 4550f61

Browse files
committed
Restore incrementEpoch() call.
1 parent 813433a commit 4550f61

File tree

2 files changed

+10
-1
lines changed

2 files changed

+10
-1
lines changed

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,7 @@ private[spark] class MapOutputTrackerMaster(
532532
None
533533
}
534534

535-
private def incrementEpoch() {
535+
def incrementEpoch() {
536536
epochLock.synchronized {
537537
epoch += 1
538538
logDebug("Increasing epoch to " + epoch)

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,15 @@ class DAGScheduler(
12241224
logInfo("waiting: " + waitingStages)
12251225
logInfo("failed: " + failedStages)
12261226

1227+
// This call to increment the epoch may not be strictly necessary, but it is retained
1228+
// for now in order to minimize the changes in behavior from an earlier version of the
1229+
// code. This existing behavior of always incrementing the epoch following any
1230+
// successful shuffle map stage completion may have benefits by causing unneeded
1231+
// cached map outputs to be cleaned up earlier on executors. In the future we can
1232+
// consider removing this call, but this will require some extra investigation.
1233+
// See https://github.com/apache/spark/pull/17955/files#r117385673 for more details.
1234+
mapOutputTracker.incrementEpoch()
1235+
12271236
clearCacheLocs()
12281237

12291238
if (!shuffleStage.isAvailable) {

0 commit comments

Comments
 (0)