Skip to content

Commit e3ac56c

Browse files
viiryadongjoon-hyun
authored andcommitted
[SPARK-31484][CORE] Add stage attempt number to temp checkpoint filename to avoid file already existing exception
### What changes were proposed in this pull request? To avoid file already existing exception when creating checkpoint file, this PR proposes to add stage attempt number to temporary checkpoint file. ### Why are the changes needed? On our production clusters, we have seen checkpointing failure. The failed stage can possibly leave partial written checkpoint file, the task of retried stage to write checkpoint file could fail due to`FileAlreadyExistsException` when creating the same file, like ``` org.apache.hadoop.fs.FileAlreadyExistsException: /path_to_checkpoint/rdd-114/.part-03154-attempt-0 for client xxx.xxx.xxx.xxx already exists org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.startFile(FSDirWriteFileOp.java:359) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2353) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFile(FSNamesystem.java:2273) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.create(NameNodeRpcServer.java:728) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.create(ClientNamenodeProtocolServerSideTranslatorPB.java:413) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:447) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:851) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:794) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2490) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:88) at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:270) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1263) at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1205) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:473) at org.apache.hadoop.hdfs.DistributedFileSystem$8.doCall(DistributedFileSystem.java:470) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:470) at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:411) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:872) at org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:204) ``` ### Does this PR introduce any user-facing change? Yes. Users won't see checkpoint file already existing exception after this PR. ### How was this patch tested? Add unit test. Closes #28255 from viirya/delete-temp-checkpoint. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 74aed8c commit e3ac56c

2 files changed

Lines changed: 36 additions & 5 deletions

File tree

core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,8 @@ private[spark] object ReliableCheckpointRDD extends Logging {
199199

200200
val finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())
201201
val finalOutputPath = new Path(outputDir, finalOutputName)
202-
val tempOutputPath =
203-
new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
202+
val tempOutputPath = new Path(outputDir,
203+
s".$finalOutputName-attempt-${ctx.stageAttemptNumber()}-${ctx.attemptNumber()}")
204204

205205
val bufferSize = env.conf.get(BUFFER_SIZE)
206206

@@ -218,11 +218,16 @@ private[spark] object ReliableCheckpointRDD extends Logging {
218218
}
219219
val serializer = env.serializer.newInstance()
220220
val serializeStream = serializer.serializeStream(fileOutputStream)
221-
Utils.tryWithSafeFinally {
221+
Utils.tryWithSafeFinallyAndFailureCallbacks {
222222
serializeStream.writeAll(iterator)
223-
} {
223+
} (catchBlock = {
224+
val deleted = fs.delete(tempOutputPath, false)
225+
if (!deleted) {
226+
logInfo(s"Failed to delete tempOutputPath $tempOutputPath.")
227+
}
228+
}, finallyBlock = {
224229
serializeStream.close()
225-
}
230+
})
226231

227232
if (!fs.rename(tempOutputPath, finalOutputPath)) {
228233
if (!fs.exists(finalOutputPath)) {

core/src/test/scala/org/apache/spark/CheckpointSuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_T
2828
import org.apache.spark.internal.config.UI._
2929
import org.apache.spark.io.CompressionCodec
3030
import org.apache.spark.rdd._
31+
import org.apache.spark.shuffle.FetchFailedException
3132
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
3233
import org.apache.spark.util.Utils
3334

@@ -642,4 +643,29 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext {
642643
assert(preferredLoc == checkpointedRDD.cachedPreferredLocations.get(partiton))
643644
}
644645
}
646+
647+
test("SPARK-31484: checkpoint should not fail in retry") {
648+
withTempDir { checkpointDir =>
649+
val conf = new SparkConf()
650+
.set(UI_ENABLED.key, "false")
651+
sc = new SparkContext("local[1]", "test", conf)
652+
sc.setCheckpointDir(checkpointDir.toString)
653+
val rdd = sc.makeRDD(1 to 200, numSlices = 4).repartition(1).mapPartitions { iter =>
654+
iter.map { i =>
655+
if (i > 100 && TaskContext.get().stageAttemptNumber() == 0) {
656+
// throw new SparkException("Make first attemp failed.")
657+
// Throw FetchFailedException to explicitly trigger stage resubmission.
658+
// A normal exception will only trigger task resubmission in the same stage.
659+
throw new FetchFailedException(null, 0, 0L, 0, 0, "Fake")
660+
} else {
661+
i
662+
}
663+
}
664+
}
665+
rdd.checkpoint()
666+
assert(rdd.collect().toSeq === (1 to 200))
667+
// Verify that RDD is checkpointed
668+
assert(rdd.firstParent.isInstanceOf[ReliableCheckpointRDD[_]])
669+
}
670+
}
645671
}

0 commit comments

Comments
 (0)