Skip to content

Commit de7ba3b

Browse files
pan3793dongjoon-hyun
authored andcommitted
[SPARK-53631][CORE] Optimize memory and perf on SHS bootstrap
### What changes were proposed in this pull request? Core ideas: 1. Change the log replay thread pool to have a bounded queue, and block task submission when the queue is full. Currently, the log replay thread pool uses an unbounded queue, when there are a large number (e.g., millions) of event logs under `spark.history.fs.logDirectory`, all tasks will be queued at the thread pool queue without blocking the scanning thread, and in the next schedule, enqueue again ... https://stackoverflow.com/questions/4521983/java-executorservice-that-blocks-on-submission-after-a-certain-queue-size 2. Move log compaction to a dedicated thread pool. Replaying and compaction are different types of workloads, isolating them from each other could improve the resource utilization. ### Why are the changes needed? Improve performance and reduce memory usage on the SHS bootstrap with empty KV cache, when there are tons of event logs. ### Does this PR introduce _any_ user-facing change? No functionality changes, but brings a new config `spark.history.fs.numCompactThreads` ### How was this patch tested? Tested on an internal cluster, starting SHS with an empty `spark.history.store.path` and ~650k event logs under `spark.history.fs.logDirectory`, the related configs are ``` spark.history.fs.cleaner.maxNum 650000 spark.history.fs.logDirectory hdfs://foo/spark2-history spark.history.fs.update.interval 5s spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider spark.history.store.maxDiskUsage 100GB spark.history.store.path /foo/bar/historyStore spark.history.fs.numReplayThreads 64 spark.history.fs.numCompactThreads 4 spark.history.store.hybridStore.enabled true spark.history.store.hybridStore.maxMemoryUsage 16g spark.history.store.hybridStore.diskBackend ROCKSDB ``` - `spark.history.store.path` is configured to an HDD path - we disable `spark.eventLog.rolling.enabled` so `numCompactThreads` has no heavy work It's much faster than before, and metrics show better CPU utilization and lower memory usage. <img width="2546" height="480" alt="bf51f797a11527ce82036669f96cf50b" src="https://github.com/user-attachments/assets/4db521b0-cf1c-4b93-a06d-27fdaf1ccec4" /> (before vs. after, the 3rd figure is "memory used") ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52382 from pan3793/SPARK-53631. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
1 parent 06f7ad2 commit de7ba3b

7 files changed

Lines changed: 218 additions & 11 deletions

File tree

common/utils-java/src/main/java/org/apache/spark/internal/LogKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ public enum LogKeys implements LogKey {
328328
LAST_ACCESS_TIME,
329329
LAST_COMMITTED_CHECKPOINT_ID,
330330
LAST_COMMIT_BASED_CHECKPOINT_ID,
331+
LAST_SCAN_TIME,
331332
LAST_VALID_TIME,
332333
LATEST_BATCH_ID,
333334
LATEST_COMMITTED_BATCH_ID,

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
102102
private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)
103103

104104
// Number of threads used to replay event logs.
105-
private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)
105+
private val numReplayThreads = conf.get(History.NUM_REPLAY_THREADS)
106+
// Number of threads used to compact rolling event logs.
107+
private val numCompactThreads = conf.get(History.NUM_COMPACT_THREADS)
106108

107109
private val logDir = conf.get(History.HISTORY_LOG_DIR)
108110

@@ -209,7 +211,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
209211
*/
210212
private val replayExecutor: ExecutorService = {
211213
if (!Utils.isTesting) {
212-
ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor")
214+
ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
215+
numReplayThreads, 1024, "log-replay-executor")
216+
} else {
217+
ThreadUtils.sameThreadExecutorService()
218+
}
219+
}
220+
221+
/**
222+
* Fixed size thread pool to compact log files.
223+
*/
224+
private val compactExecutor: ExecutorService = {
225+
if (!Utils.isTesting) {
226+
ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
227+
numCompactThreads, 1024, "log-compact-executor")
213228
} else {
214229
ThreadUtils.sameThreadExecutorService()
215230
}
@@ -431,7 +446,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
431446
initThread.interrupt()
432447
initThread.join()
433448
}
434-
Seq(pool, replayExecutor).foreach { executor =>
449+
Seq(pool, replayExecutor, compactExecutor).foreach { executor =>
435450
executor.shutdown()
436451
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
437452
executor.shutdownNow()
@@ -487,15 +502,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
487502
var count: Int = 0
488503
try {
489504
val newLastScanTime = clock.getTimeMillis()
490-
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
505+
logInfo(log"Scanning ${MDC(HISTORY_DIR, logDir)} with " +
506+
log"lastScanTime=${MDC(LAST_SCAN_TIME, lastScanTime)}")
491507

492508
// Mark entries that are processing as not stale. Such entries do not have a chance to be
493509
// updated with the new 'lastProcessed' time and thus any entity that completes processing
494510
// right after this check and before the check for stale entities will be identified as stale
495511
// and will be deleted from the UI until the next 'checkForLogs' run.
496512
val notStale = mutable.HashSet[String]()
497513
val updated = Option(fs.listStatus(new Path(logDir)))
498-
.map(_.toImmutableArraySeq).getOrElse(Nil)
514+
.map(_.toImmutableArraySeq).getOrElse(Seq.empty)
499515
.filter { entry => isAccessible(entry.getPath) }
500516
.filter { entry =>
501517
if (isProcessing(entry.getPath)) {
@@ -612,11 +628,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
612628
}
613629

614630
if (updated.nonEmpty) {
615-
logDebug(s"New/updated attempts found: ${updated.size} ${updated.map(_.rootPath)}")
631+
logInfo(log"New/updated attempts found: ${MDC(NUM_ATTEMPT, updated.size)}")
616632
}
617633

618634
updated.foreach { entry =>
619-
submitLogProcessTask(entry.rootPath) { () =>
635+
submitLogProcessTask(entry.rootPath, replayExecutor) { () =>
620636
mergeApplicationListing(entry, newLastScanTime, true)
621637
}
622638
}
@@ -788,7 +804,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
788804

789805
// triggering another task for compaction task only if it succeeds
790806
if (succeeded) {
791-
submitLogProcessTask(rootPath) { () => compact(reader) }
807+
submitLogProcessTask(rootPath, compactExecutor) { () => compact(reader) }
792808
}
793809
}
794810
}
@@ -1456,13 +1472,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
14561472
}
14571473

14581474
/** NOTE: 'task' should ensure it executes 'endProcessing' at the end */
1459-
private def submitLogProcessTask(rootPath: Path)(task: Runnable): Unit = {
1475+
private def submitLogProcessTask(
1476+
rootPath: Path, pool: ExecutorService)(task: Runnable): Unit = {
14601477
try {
14611478
processing(rootPath)
1462-
replayExecutor.submit(task)
1479+
pool.submit(task)
14631480
} catch {
14641481
// let the iteration over the updated entries break, since an exception on
1465-
// replayExecutor.submit (..) indicates the ExecutorService is unable
1482+
// pool.submit (..) indicates the ExecutorService is unable
14661483
// to take any more submissions at this time
14671484
case e: Exception =>
14681485
logError(s"Exception while submitting task", e)

core/src/main/scala/org/apache/spark/internal/config/History.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,12 @@ private[spark] object History {
228228
.intConf
229229
.createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
230230

231+
val NUM_COMPACT_THREADS = ConfigBuilder("spark.history.fs.numCompactThreads")
232+
.version("4.1.0")
233+
.doc("Number of threads that will be used by history server to compact event logs.")
234+
.intConf
235+
.createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
236+
231237
val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications")
232238
.version("1.0.0")
233239
.doc("The number of applications to retain UI data for in the cache. If this cap is " +
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
import java.util
21+
import java.util.concurrent._
22+
23+
import com.google.common.util.concurrent.Futures
24+
25+
// scalastyle:off
26+
/**
27+
* This thread pool executor throttles the submission of new tasks by using a semaphore.
28+
* Task submissions require permits, task completions release permits.
29+
* <p>
30+
* NOTE: [[invoke*]] methods are not supported, you should either use the [[submit]] methods
31+
* or the [[execute]] method.
32+
* <p>
33+
* This is inspired by
34+
* <a href="https://github.com/apache/incubator-retired-s4/blob/0.6.0-Final/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
35+
* Apache S4 BlockingThreadPoolExecutorService</a>
36+
*/
37+
// scalastyle:on
38+
private[spark] class BlockingThreadPoolExecutorService(
39+
nThreads: Int, workQueueSize: Int, threadFactory: ThreadFactory)
40+
extends ExecutorService {
41+
42+
private val permits = new Semaphore(nThreads + workQueueSize)
43+
44+
private val workQuque = new LinkedBlockingQueue[Runnable](nThreads + workQueueSize)
45+
46+
private val delegate = new ThreadPoolExecutor(
47+
nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, workQuque, threadFactory)
48+
49+
override def shutdown(): Unit = delegate.shutdown()
50+
51+
override def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
52+
53+
override def isShutdown: Boolean = delegate.isShutdown
54+
55+
override def isTerminated: Boolean = delegate.isTerminated
56+
57+
override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean =
58+
delegate.awaitTermination(timeout, unit)
59+
60+
override def submit[T](task: Callable[T]): Future[T] = {
61+
try permits.acquire() catch {
62+
case e: InterruptedException =>
63+
Thread.currentThread.interrupt()
64+
return Futures.immediateFailedFuture(e)
65+
}
66+
delegate.submit(new CallableWithPermitRelease(task))
67+
}
68+
69+
override def submit[T](task: Runnable, result: T): Future[T] = {
70+
try permits.acquire() catch {
71+
case e: InterruptedException =>
72+
Thread.currentThread.interrupt()
73+
return Futures.immediateFailedFuture(e)
74+
}
75+
delegate.submit(new RunnableWithPermitRelease(task), result)
76+
}
77+
78+
override def submit(task: Runnable): Future[_] = {
79+
try permits.acquire() catch {
80+
case e: InterruptedException =>
81+
Thread.currentThread.interrupt()
82+
return Futures.immediateFailedFuture(e)
83+
}
84+
delegate.submit(new RunnableWithPermitRelease(task))
85+
}
86+
87+
override def execute(command: Runnable): Unit = {
88+
try permits.acquire() catch {
89+
case _: InterruptedException =>
90+
Thread.currentThread.interrupt()
91+
}
92+
delegate.execute(new RunnableWithPermitRelease(command))
93+
}
94+
95+
override def invokeAll[T](
96+
tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] =
97+
throw new UnsupportedOperationException("Not implemented")
98+
99+
override def invokeAll[T](
100+
tasks: util.Collection[_ <: Callable[T]],
101+
timeout: Long, unit: TimeUnit): util.List[Future[T]] =
102+
throw new UnsupportedOperationException("Not implemented")
103+
104+
override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T =
105+
throw new UnsupportedOperationException("Not implemented")
106+
107+
override def invokeAny[T](
108+
tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T =
109+
throw new UnsupportedOperationException("Not implemented")
110+
111+
/**
112+
* Releases a permit after the task is executed.
113+
*/
114+
private class RunnableWithPermitRelease(delegate: Runnable) extends Runnable {
115+
override def run(): Unit = try delegate.run() finally permits.release()
116+
}
117+
118+
/**
119+
* Releases a permit after the task is completed.
120+
*/
121+
private class CallableWithPermitRelease[T](delegate: Callable[T]) extends Callable[T] {
122+
override def call(): T = try delegate.call() finally permits.release()
123+
}
124+
}

core/src/main/scala/org/apache/spark/util/ThreadUtils.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,23 @@ private[spark] object ThreadUtils {
188188
rejectedExecutionHandler)
189189
}
190190

191+
/**
192+
* Simliar to newDaemonFixedThreadPool, but with a bound workQueue, task submission will
193+
* be blocked when queue is full.
194+
*
195+
* @param nThreads the number of threads in the pool
196+
* @param workQueueSize the capacity of the queue to use for holding tasks before they are
197+
* executed. Task submission will be blocked when queue is full.
198+
* @param prefix thread names are formatted as prefix-ID, where ID is a unique, sequentially
199+
* assigned integer.
200+
* @return BlockingThreadPoolExecutorService
201+
*/
202+
def newDaemonBlockingThreadPoolExecutorService(
203+
nThreads: Int, workQueueSize: Int, prefix: String): ExecutorService = {
204+
val threadFactory = namedThreadFactory(prefix)
205+
new BlockingThreadPoolExecutorService(nThreads, workQueueSize, threadFactory)
206+
}
207+
191208
/**
192209
* Wrapper over ScheduledThreadPoolExecutor the pool with daemon threads.
193210
*/

core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,40 @@ class ThreadUtilsSuite extends SparkFunSuite {
9898
}
9999
}
100100

101+
test("newDaemonBlockingThreadPoolExecutorService") {
102+
val nThread = 3
103+
val workQueueSize = 5
104+
val submithreadsLatch = new CountDownLatch(nThread + workQueueSize + 1)
105+
val latch = new CountDownLatch(1)
106+
val blockingPool = ThreadUtils.newDaemonBlockingThreadPoolExecutorService(
107+
nThread, workQueueSize, "ThreadUtilsSuite-newDaemonBlockingThreadPoolExecutorService")
108+
109+
try {
110+
val submitThread = new Thread(() => {
111+
(0 until nThread + workQueueSize + 1).foreach { i =>
112+
blockingPool.execute(() => {
113+
latch.await(10, TimeUnit.SECONDS)
114+
})
115+
submithreadsLatch.countDown()
116+
}
117+
})
118+
submitThread.setDaemon(true)
119+
submitThread.start()
120+
121+
// the last one task submission will be blocked until previous tasks completed
122+
eventually(timeout(10.seconds)) {
123+
assert(submithreadsLatch.getCount === 1L)
124+
}
125+
latch.countDown()
126+
eventually(timeout(10.seconds)) {
127+
assert(submithreadsLatch.getCount === 0L)
128+
assert(!submitThread.isAlive)
129+
}
130+
} finally {
131+
blockingPool.shutdownNow()
132+
}
133+
}
134+
101135
test("sameThread") {
102136
val callerThreadName = Thread.currentThread().getName()
103137
val f = Future {

docs/monitoring.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,14 @@ Security options for the Spark History Server are covered more detail in the
329329
</td>
330330
<td>2.0.0</td>
331331
</tr>
332+
<tr>
333+
<td>spark.history.fs.numCompactThreads</td>
334+
<td>25% of available cores</td>
335+
<td>
336+
Number of threads that will be used by history server to compact event logs.
337+
</td>
338+
<td>4.1.0</td>
339+
</tr>
332340
<tr>
333341
<td>spark.history.store.maxDiskUsage</td>
334342
<td>10g</td>

0 commit comments

Comments
 (0)