Skip to content

Commit 049fbfd

Browse files
gaborgsomogyiJackey Lee
authored andcommitted
[SPARK-26389][SS] Add force delete temp checkpoint configuration
## What changes were proposed in this pull request? Not all users wants to keep temporary checkpoint directories. Additionally hard to restore from it. In this PR I've added a force delete flag which is default `false`. Additionally not clear for users when temporary checkpoint directory deleted so added log messages to explain this a bit more. ## How was this patch tested? Existing + additional unit tests. Closes apache#23732 from gaborgsomogyi/SPARK-26389. Authored-by: Gabor Somogyi <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent f4ec123 commit 049fbfd

4 files changed

Lines changed: 51 additions & 6 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -907,6 +907,12 @@ object SQLConf {
907907
.stringConf
908908
.createOptional
909909

910+
val FORCE_DELETE_TEMP_CHECKPOINT_LOCATION =
911+
buildConf("spark.sql.streaming.forceDeleteTempCheckpointLocation")
912+
.doc("When true, enable temporary checkpoint locations force delete.")
913+
.booleanConf
914+
.createWithDefault(false)
915+
910916
val MIN_BATCHES_TO_RETAIN = buildConf("spark.sql.streaming.minBatchesToRetain")
911917
.internal()
912918
.doc("The minimum number of batches that must be retained and made recoverable.")

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ case object RECONFIGURING extends State
5555
* and the results are committed transactionally to the given [[Sink]].
5656
*
5757
* @param deleteCheckpointOnStop whether to delete the checkpoint if the query is stopped without
58-
* errors
58+
* errors. Checkpoint deletion can be forced with the appropriate
59+
* Spark configuration.
5960
*/
6061
abstract class StreamExecution(
6162
override val sparkSession: SparkSession,
@@ -92,6 +93,7 @@ abstract class StreamExecution(
9293
fs.mkdirs(checkpointPath)
9394
checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory).toUri.toString
9495
}
96+
logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.")
9597

9698
def logicalPlan: LogicalPlan
9799

@@ -335,10 +337,13 @@ abstract class StreamExecution(
335337
postEvent(
336338
new QueryTerminatedEvent(id, runId, exception.map(_.cause).map(Utils.exceptionString)))
337339

338-
// Delete the temp checkpoint only when the query didn't fail
339-
if (deleteCheckpointOnStop && exception.isEmpty) {
340+
// Delete the temp checkpoint when either force delete enabled or the query didn't fail
341+
if (deleteCheckpointOnStop &&
342+
(sparkSession.sessionState.conf
343+
.getConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION) || exception.isEmpty)) {
340344
val checkpointPath = new Path(resolvedCheckpointRoot)
341345
try {
346+
logInfo(s"Deleting checkpoint $checkpointPath.")
342347
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
343348
fs.delete(checkpointPath, true)
344349
} catch {

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,13 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
221221
}
222222
}.getOrElse {
223223
if (useTempCheckpointLocation) {
224-
// Delete the temp checkpoint when a query is being stopped without errors.
225224
deleteCheckpointOnStop = true
226-
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
225+
val tempDir = Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
226+
logWarning("Temporary checkpoint location created which is deleted normally when" +
227+
s" the query didn't fail: $tempDir. If it's required to delete it under any" +
228+
s" circumstances, please set ${SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key} to" +
229+
s" true. Important to know deleting temp checkpoint folder is best effort.")
230+
tempDir
227231
} else {
228232
throw new AnalysisException(
229233
"checkpointLocation must be specified either " +

sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,21 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
614614
}
615615
}
616616

617+
test("configured checkpoint dir should not be deleted if a query is stopped without errors and" +
618+
" force temp checkpoint deletion enabled") {
619+
import testImplicits._
620+
withTempDir { checkpointPath =>
621+
withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointPath.getAbsolutePath,
622+
SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
623+
val ds = MemoryStream[Int].toDS
624+
val query = ds.writeStream.format("console").start()
625+
assert(checkpointPath.exists())
626+
query.stop()
627+
assert(checkpointPath.exists())
628+
}
629+
}
630+
}
631+
617632
test("temp checkpoint dir should be deleted if a query is stopped without errors") {
618633
import testImplicits._
619634
val query = MemoryStream[Int].toDS.writeStream.format("console").start()
@@ -627,6 +642,17 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
627642
}
628643

629644
testQuietly("temp checkpoint dir should not be deleted if a query is stopped with an error") {
645+
testTempCheckpointWithFailedQuery(false)
646+
}
647+
648+
testQuietly("temp checkpoint should be deleted if a query is stopped with an error and force" +
649+
" temp checkpoint deletion enabled") {
650+
withSQLConf(SQLConf.FORCE_DELETE_TEMP_CHECKPOINT_LOCATION.key -> "true") {
651+
testTempCheckpointWithFailedQuery(true)
652+
}
653+
}
654+
655+
private def testTempCheckpointWithFailedQuery(checkpointMustBeDeleted: Boolean): Unit = {
630656
import testImplicits._
631657
val input = MemoryStream[Int]
632658
val query = input.toDS.map(_ / 0).writeStream.format("console").start()
@@ -638,7 +664,11 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
638664
intercept[StreamingQueryException] {
639665
query.awaitTermination()
640666
}
641-
assert(fs.exists(checkpointDir))
667+
if (!checkpointMustBeDeleted) {
668+
assert(fs.exists(checkpointDir))
669+
} else {
670+
assert(!fs.exists(checkpointDir))
671+
}
642672
}
643673

644674
test("SPARK-20431: Specify a schema by using a DDL-formatted string") {

0 commit comments

Comments
 (0)