From 59277d698aef2910b8d22ad4b5ef539e959c3769 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Sun, 7 Jul 2019 23:57:43 -0700 Subject: [PATCH 1/6] [SPARK-28294][CORE] Support `spark.history.fs.cleaner.maxNum` configuration --- .../deploy/history/FsHistoryProvider.scala | 77 ++++++++++++++----- .../spark/internal/config/History.scala | 7 ++ .../history/FsHistoryProviderSuite.scala | 50 ++++++++++++ 3 files changed, 115 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index f2ee5994a8f74..59b44d7c8eeed 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -805,6 +805,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) */ private[history] def cleanLogs(): Unit = Utils.tryLog { val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000 + val maxNum = conf.get(MAX_LOG_NUM) val expired = listing.view(classOf[ApplicationInfoWrapper]) .index("oldestAttempt") @@ -817,23 +818,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val (remaining, toDelete) = app.attempts.partition { attempt => attempt.info.lastUpdated.getTime() >= maxTime } - - if (remaining.nonEmpty) { - val newApp = new ApplicationInfoWrapper(app.info, remaining) - listing.write(newApp) - } - - toDelete.foreach { attempt => - logInfo(s"Deleting expired event log for ${attempt.logPath}") - val logPath = new Path(logDir, attempt.logPath) - listing.delete(classOf[LogInfo], logPath.toString()) - cleanAppData(app.id, attempt.info.attemptId, logPath.toString()) - deleteLog(fs, logPath) - } - - if (remaining.isEmpty) { - listing.delete(app.getClass(), app.id) - } + deleteAttemptLogs(app, remaining, toDelete) } // Delete log files that don't have a valid application and exceed the configured max age. @@ -851,10 +836,62 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) listing.delete(classOf[LogInfo], log.logPath) } } + + // If the number of files is bigger than MAX_LOG_NUM, + // clean up all completed attempts per application one by one. + val num = listing.view(classOf[LogInfo]).index("lastProcessed").asScala.count(_ => true) + var count = num - maxNum + if (count > 0) { + logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.") + val oldAttempts = listing.view(classOf[ApplicationInfoWrapper]) + .index("oldestAttempt") + .asScala + .toList + oldAttempts.foreach { app => + if (count > 0) { + // Applications may have multiple attempts, some of which may not be completed yet. + val (toDelete, remaining) = app.attempts.partition { attempt => + attempt.info.completed + } + count -= deleteAttemptLogs(app, remaining, toDelete) + } + } + if (count > 0) { + logError(s"Fail to clean up according to MAX_LOG_NUM policy ($maxNum).") + } + } + // Clean the blacklist from the expired entries. clearBlacklist(CLEAN_INTERVAL_S) } + private def deleteAttemptLogs( + app: ApplicationInfoWrapper, + remaining: List[AttemptInfoWrapper], + toDelete: List[AttemptInfoWrapper]): Int = { + if (remaining.nonEmpty) { + val newApp = new ApplicationInfoWrapper(app.info, remaining) + listing.write(newApp) + } + + var countDeleted = 0 + toDelete.foreach { attempt => + logInfo(s"Deleting expired event log for ${attempt.logPath}") + val logPath = new Path(logDir, attempt.logPath) + listing.delete(classOf[LogInfo], logPath.toString()) + cleanAppData(app.id, attempt.info.attemptId, logPath.toString()) + if (deleteLog(fs, logPath)) { + countDeleted += 1 + } + } + + if (remaining.isEmpty) { + listing.delete(app.getClass(), app.id) + } + + countDeleted + } + /** * Delete driver logs from the configured spark dfs dir that exceed the configured max age */ @@ -1066,12 +1103,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) throw new NoSuchElementException(s"Cannot find attempt $attemptId of $appId.")) } - private def deleteLog(fs: FileSystem, log: Path): Unit = { + private def deleteLog(fs: FileSystem, log: Path): Boolean = { + var deleted = false if (isBlacklisted(log)) { logDebug(s"Skipping deleting $log as we don't have permissions on it.") } else { try { - fs.delete(log, true) + deleted = fs.delete(log, true) } catch { case _: AccessControlException => logInfo(s"No permission to delete $log, ignoring.") @@ -1079,6 +1117,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) logError(s"IOException in cleaning $log", ioe) } } + deleted } private def isCompleted(name: String): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 1d73f01cb84d0..68785fa2b1f13 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -49,6 +49,13 @@ private[spark] object History { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("7d") + // This is designed to be smaller than the default value of + // `dfs.namenode.fs-limits.max-directory-items` (1024 * 1024). + val MAX_LOG_NUM = ConfigBuilder("spark.history.fs.cleaner.maxNum") + .doc("The maximum number of log files in the event log directory.") + .intConf + .createWithDefault(1000000) + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") .doc("Local directory where to cache application history information. By default this is " + "not set, meaning all history information will be kept in memory.") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 571c6e3e579ba..aaf068e81db0a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1185,6 +1185,56 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { assert(!mockedProvider.shouldReloadLog(logInfo, fileStatus)) } + test("log cleaner with the maximum number of log files") { + val clock = new ManualClock(0) + (5 to 0 by -1).foreach { num => + val log1_1 = newLogFile("app1", Some("attempt1"), inProgress = false) + writeFile(log1_1, true, None, + SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")), + SparkListenerApplicationEnd(2L) + ) + log1_1.setLastModified(2L) + + val log2_1 = newLogFile("app2", Some("attempt1"), inProgress = false) + writeFile(log2_1, true, None, + SparkListenerApplicationStart("app2", Some("app2"), 3L, "test", Some("attempt1")), + SparkListenerApplicationEnd(4L) + ) + log2_1.setLastModified(4L) + + val log3_1 = newLogFile("app3", Some("attempt1"), inProgress = false) + writeFile(log3_1, true, None, + SparkListenerApplicationStart("app3", Some("app3"), 5L, "test", Some("attempt1")), + SparkListenerApplicationEnd(6L) + ) + log3_1.setLastModified(6L) + + val log1_2_incomplete = newLogFile("app1", Some("attempt2"), inProgress = false) + writeFile(log1_2_incomplete, true, None, + SparkListenerApplicationStart("app1", Some("app1"), 7L, "test", Some("attempt2")) + ) + log1_2_incomplete.setLastModified(8L) + + val log3_2 = newLogFile("app3", Some("attempt2"), inProgress = false) + writeFile(log3_2, true, None, + SparkListenerApplicationStart("app3", Some("app3"), 9L, "test", Some("attempt2")), + SparkListenerApplicationEnd(10L) + ) + log3_2.setLastModified(10L) + + val provider = new FsHistoryProvider(createTestConf().set(MAX_LOG_NUM.key, s"$num"), clock) + updateAndCheck(provider) { list => + assert(log1_1.exists() == (num > 4)) + assert(log1_2_incomplete.exists()) // Always exists for all configurations + + assert(log2_1.exists() == (num > 3)) + + assert(log3_1.exists() == (num > 2)) + assert(log3_2.exists() == (num > 2)) + } + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: From 84ed42b3f18cff75142563ebca08fffa1e90cc85 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 8 Jul 2019 11:49:43 -0700 Subject: [PATCH 2/6] Address comments --- .../spark/deploy/history/FsHistoryProvider.scala | 6 ++---- .../apache/spark/internal/config/History.scala | 3 ++- docs/monitoring.md | 15 ++++++++++++++- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 59b44d7c8eeed..6fa9b72199fd2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -839,7 +839,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // If the number of files is bigger than MAX_LOG_NUM, // clean up all completed attempts per application one by one. - val num = listing.view(classOf[LogInfo]).index("lastProcessed").asScala.count(_ => true) + val num = listing.view(classOf[LogInfo]).index("lastProcessed").asScala.size var count = num - maxNum if (count > 0) { logInfo(s"Try to delete $count old event logs to keep $maxNum logs in total.") @@ -850,9 +850,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) oldAttempts.foreach { app => if (count > 0) { // Applications may have multiple attempts, some of which may not be completed yet. - val (toDelete, remaining) = app.attempts.partition { attempt => - attempt.info.completed - } + val (toDelete, remaining) = app.attempts.partition(_.info.completed) count -= deleteAttemptLogs(app, remaining, toDelete) } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 68785fa2b1f13..4434b7a3ce2fc 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -49,8 +49,9 @@ private[spark] object History { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("7d") - // This is designed to be smaller than the default value of + // This is designed to be safely smaller than the default value of // `dfs.namenode.fs-limits.max-directory-items` (1024 * 1024). + // If the directory hit the limit, new Spark applications will fail to create event logs. val MAX_LOG_NUM = ConfigBuilder("spark.history.fs.cleaner.maxNum") .doc("The maximum number of log files in the event log directory.") .intConf diff --git a/docs/monitoring.md b/docs/monitoring.md index 4017677861a75..3e13fdca1831b 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -190,7 +190,10 @@ Security options for the Spark History Server are covered more detail in the 1d How often the filesystem job history cleaner checks for files to delete. - Files are only deleted if they are older than spark.history.fs.cleaner.maxAge + Files are deleted in two cases. First, they are older than spark.history.fs.cleaner.maxAge. + Second, if the number of files in the event log directory is beyond + spark.history.fs.cleaner.maxNum, Spark tries to clean up the completed attempts + from the applications based on the order of their oldest attempt time. @@ -200,6 +203,16 @@ Security options for the Spark History Server are covered more detail in the Job history files older than this will be deleted when the filesystem history cleaner runs. + + spark.history.fs.cleaner.maxNum + 1000000 + + The number of maximum files in the event log directory. + Spark tries to clean up the completed attempt logs to maintain the log directory under this limit. + This should be smaller than the underlying file system limit like + `dfs.namenode.fs-limits.max-directory-items` in HDFS. + + spark.history.fs.endEventReparseChunkSize 1m From 3444b7321a44355091ae1dc20397f240b959ef0d Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 8 Jul 2019 12:33:31 -0700 Subject: [PATCH 3/6] Address comments --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- docs/monitoring.md | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 6fa9b72199fd2..60e9c5b0f7f67 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -855,7 +855,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } if (count > 0) { - logError(s"Fail to clean up according to MAX_LOG_NUM policy ($maxNum).") + logWarning(s"Fail to clean up according to MAX_LOG_NUM policy ($maxNum).") } } diff --git a/docs/monitoring.md b/docs/monitoring.md index 3e13fdca1831b..5fab3a2ed0d4c 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -190,8 +190,9 @@ Security options for the Spark History Server are covered more detail in the 1d How often the filesystem job history cleaner checks for files to delete. - Files are deleted in two cases. First, they are older than spark.history.fs.cleaner.maxAge. - Second, if the number of files in the event log directory is beyond + Files are deleted if at least one of two conditions holds. + First, they're deleted if they're older than spark.history.fs.cleaner.maxAge. + They are also deleted if the number of files is more than spark.history.fs.cleaner.maxNum, Spark tries to clean up the completed attempts from the applications based on the order of their oldest attempt time. From 1264a8dedef83e9792b10209a1a29f16c12f52f6 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 8 Jul 2019 12:50:55 -0700 Subject: [PATCH 4/6] Use Int.MaxValue by default --- .../scala/org/apache/spark/internal/config/History.scala | 5 +---- docs/monitoring.md | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 4434b7a3ce2fc..ca9af316dffd0 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -49,13 +49,10 @@ private[spark] object History { .timeConf(TimeUnit.SECONDS) .createWithDefaultString("7d") - // This is designed to be safely smaller than the default value of - // `dfs.namenode.fs-limits.max-directory-items` (1024 * 1024). - // If the directory hit the limit, new Spark applications will fail to create event logs. val MAX_LOG_NUM = ConfigBuilder("spark.history.fs.cleaner.maxNum") .doc("The maximum number of log files in the event log directory.") .intConf - .createWithDefault(1000000) + .createWithDefault(Int.MaxValue) val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") .doc("Local directory where to cache application history information. By default this is " + diff --git a/docs/monitoring.md b/docs/monitoring.md index 5fab3a2ed0d4c..79a9984db1229 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -206,7 +206,7 @@ Security options for the Spark History Server are covered more detail in the spark.history.fs.cleaner.maxNum - 1000000 + Int.MaxValue The number of maximum files in the event log directory. Spark tries to clean up the completed attempt logs to maintain the log directory under this limit. From 9cdc9cc6c1989b176f70fb06e763db61555b2d93 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 8 Jul 2019 12:54:42 -0700 Subject: [PATCH 5/6] Remove `.toList` --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 60e9c5b0f7f67..5f9b18ce01279 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -846,7 +846,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val oldAttempts = listing.view(classOf[ApplicationInfoWrapper]) .index("oldestAttempt") .asScala - .toList oldAttempts.foreach { app => if (count > 0) { // Applications may have multiple attempts, some of which may not be completed yet. From f2c93a0291877f3e4af03ab279fba93201054d14 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 8 Jul 2019 12:56:53 -0700 Subject: [PATCH 6/6] fix typo --- docs/monitoring.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/monitoring.md b/docs/monitoring.md index 79a9984db1229..0f7210c3b8bb2 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -208,7 +208,7 @@ Security options for the Spark History Server are covered more detail in the spark.history.fs.cleaner.maxNum Int.MaxValue - The number of maximum files in the event log directory. + The maximum number of files in the event log directory. Spark tries to clean up the completed attempt logs to maintain the log directory under this limit. This should be smaller than the underlying file system limit like `dfs.namenode.fs-limits.max-directory-items` in HDFS.