Skip to content

Commit 9f5afd4

Browse files
committed
add a new method to avoid file truncate
1 parent de62b5a commit 9f5afd4

4 files changed

Lines changed: 24 additions & 16 deletions

File tree

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -289,10 +289,7 @@ public Option<MapStatus> stop(boolean success) {
289289
try {
290290
for (DiskBlockObjectWriter writer : partitionWriters) {
291291
// This method explicitly does _not_ throw exceptions:
292-
File file = writer.revertPartialWritesAndClose();
293-
if (!file.delete()) {
294-
logger.error("Error while deleting file {}", file.getAbsolutePath());
295-
}
292+
writer.deleteHeldFile();
296293
}
297294
} finally {
298295
partitionWriters = null;

core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,27 @@ private[spark] class DiskBlockObjectWriter(
262262
file
263263
}
264264

265+
/**
266+
* Reverts write metrics that haven't been committed yet and delete the file held
267+
* by current `DiskBlockObjectWriter`. Callers should invoke this function when there
268+
* are runtime exceptions in file writing process and the file is no longer needed.
269+
*/
270+
def deleteHeldFile(): Unit = {
271+
Utils.tryWithSafeFinally {
272+
if (initialized) {
273+
writeMetrics.decBytesWritten(reportedPosition - committedPosition)
274+
writeMetrics.decRecordsWritten(numRecordsWritten)
275+
closeResources()
276+
}
277+
} {
278+
if (file.exists()) {
279+
if (!file.delete()) {
280+
logWarning(s"Error deleting $file")
281+
}
282+
}
283+
}
284+
}
285+
265286
/**
266287
* Writes a key-value pair.
267288
*/

core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,12 +250,7 @@ class ExternalAppendOnlyMap[K, V, C](
250250
if (!success) {
251251
// This code path only happens if an exception was thrown above before we set success;
252252
// close our stuff and let the exception be thrown further
253-
writer.revertPartialWritesAndClose()
254-
if (file.exists()) {
255-
if (!file.delete()) {
256-
logWarning(s"Error deleting ${file}")
257-
}
258-
}
253+
writer.deleteHeldFile()
259254
}
260255
}
261256

core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -321,12 +321,7 @@ private[spark] class ExternalSorter[K, V, C](
321321
if (!success) {
322322
// This code path only happens if an exception was thrown above before we set success;
323323
// close our stuff and let the exception be thrown further
324-
writer.revertPartialWritesAndClose()
325-
if (file.exists()) {
326-
if (!file.delete()) {
327-
logWarning(s"Error deleting ${file}")
328-
}
329-
}
324+
writer.deleteHeldFile()
330325
}
331326
}
332327

0 commit comments

Comments
 (0)