Skip to content

Commit 04f026c

Browse files
committed
[SPARK-41054][UI][CORE] Support RocksDB as KVStore in live UI
### What changes were proposed in this pull request? Support using RocksDB as the KVStore in live UI. The location of RocksDB can be set via configuration `spark.ui.store.path`. The configuration is optional. The default KV store will still be in memory. Note: let's make it ROCKSDB only for now instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with improvements on writes and reads. Furthermore, we can reuse the RocksDBFileManager in streaming for replicating the local RocksDB file to DFS. The replication in DFS can be used for the Spark history server. ### Why are the changes needed? The current architecture of Spark live UI and Spark history server(SHS) is too simple to serve large clusters and heavy workloads: - Spark stores all the live UI date in memory. The size can be a few GBs and affects the driver's stability (OOM). - There is a limitation of storing 1000 queries only. Note that we can’t simply increase the limitation under the current Architecture. I did a memory profiling. Storing one query execution detail can take 800KB while storing one task requires 0.3KB. So for 1000 SQL queries with 1000* 2000 tasks, the memory usage for query execution and task data will be 1.4GB. Spark UI stores UI data for jobs/stages/executors as well. So to store 10k queries, it may take more than 14GB. - SHS has to parse JSON format event log for the initial start. The uncompressed event logs can be as big as a few GBs, and the parse can be quite slow. Some users reported they had to wait for more than half an hour. With RocksDB as KVStore, we can improve the stability of Spark driver and fasten the startup of SHS. ### Does this PR introduce _any_ user-facing change? Yes, supporting RocksDB as the KVStore in live UI. The location of RocksDB can be set via configuration `spark.ui.store.path`. The configuration is optional. The default KV store will still be in memory. ### How was this patch tested? New UT Preview of the doc change: <img width="895" alt="image" src="https://user-images.githubusercontent.com/1097932/203184691-b6815990-b7b0-422b-aded-8e1771c0c167.png"> Closes #38567 from gengliangwang/liveUIKVStore. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: Gengliang Wang <[email protected]>
1 parent 2513368 commit 04f026c

File tree

6 files changed

+96
-42
lines changed

6 files changed

+96
-42
lines changed

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package org.apache.spark.deploy.history
1919

2020
import java.io.{File, FileNotFoundException, IOException}
2121
import java.lang.{Long => JLong}
22-
import java.nio.file.Files
2322
import java.util.{Date, NoSuchElementException, ServiceLoader}
2423
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, TimeUnit}
2524
import java.util.zip.ZipOutputStream
@@ -36,15 +35,12 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
3635
import org.apache.hadoop.hdfs.DistributedFileSystem
3736
import org.apache.hadoop.hdfs.protocol.HdfsConstants
3837
import org.apache.hadoop.security.AccessControlException
39-
import org.fusesource.leveldbjni.internal.NativeDB
40-
import org.rocksdb.RocksDBException
4138

4239
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
4340
import org.apache.spark.deploy.SparkHadoopUtil
4441
import org.apache.spark.internal.Logging
4542
import org.apache.spark.internal.config._
4643
import org.apache.spark.internal.config.History._
47-
import org.apache.spark.internal.config.History.HybridStoreDiskBackend._
4844
import org.apache.spark.internal.config.Status._
4945
import org.apache.spark.internal.config.Tests.IS_TESTING
5046
import org.apache.spark.internal.config.UI._
@@ -136,34 +132,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
136132
HybridStoreDiskBackend.withName(conf.get(History.HYBRID_STORE_DISK_BACKEND))
137133

138134
// Visible for testing.
139-
private[history] val listing: KVStore = storePath.map { path =>
140-
val dir = hybridStoreDiskBackend match {
141-
case LEVELDB => "listing.ldb"
142-
case ROCKSDB => "listing.rdb"
143-
}
144-
val dbPath = Files.createDirectories(new File(path, dir).toPath()).toFile()
145-
Utils.chmod700(dbPath)
146-
147-
val metadata = FsHistoryProviderMetadata(CURRENT_LISTING_VERSION,
148-
AppStatusStore.CURRENT_VERSION, logDir)
149-
150-
try {
151-
open(dbPath, metadata, conf)
152-
} catch {
153-
// If there's an error, remove the listing database and any existing UI database
154-
// from the store directory, since it's extremely likely that they'll all contain
155-
// incompatible information.
156-
case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
157-
logInfo("Detected incompatible DB versions, deleting...")
158-
path.listFiles().foreach(Utils.deleteRecursively)
159-
open(dbPath, metadata, conf)
160-
case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
161-
// Get rid of the corrupted data and re-create it.
162-
logWarning(s"Failed to load disk store $dbPath :", dbExc)
163-
Utils.deleteRecursively(dbPath)
164-
open(dbPath, metadata, conf)
165-
}
166-
}.getOrElse(new InMemoryStore())
135+
private[history] val listing: KVStore = {
136+
KVUtils.createKVStore(storePath, hybridStoreDiskBackend, conf)
137+
}
167138

168139
private val diskManager = storePath.map { path =>
169140
new HistoryServerDiskManager(conf, path, listing, clock)
@@ -1444,7 +1415,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
14441415
}
14451416
}
14461417

1447-
private[history] object FsHistoryProvider {
1418+
private[spark] object FsHistoryProvider {
14481419

14491420
private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""
14501421

@@ -1459,10 +1430,10 @@ private[history] object FsHistoryProvider {
14591430
* db, if the version does not match this value, the FsHistoryProvider will throw away
14601431
* all data and re-generate the listing data from the event logs.
14611432
*/
1462-
private[history] val CURRENT_LISTING_VERSION = 1L
1433+
val CURRENT_LISTING_VERSION = 1L
14631434
}
14641435

1465-
private[history] case class FsHistoryProviderMetadata(
1436+
private[spark] case class FsHistoryProviderMetadata(
14661437
version: Long,
14671438
uiVersion: Long,
14681439
logDir: String)

core/src/main/scala/org/apache/spark/internal/config/Status.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,11 @@ private[spark] object Status {
7070
.version("3.0.0")
7171
.booleanConf
7272
.createWithDefault(false)
73+
74+
val LIVE_UI_LOCAL_STORE_DIR = ConfigBuilder("spark.ui.store.path")
75+
.doc("Local directory where to cache application information for live UI. By default this is " +
76+
"not set, meaning all application information will be kept in memory.")
77+
.version("3.4.0")
78+
.stringConf
79+
.createOptional
7380
}

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,20 @@
1717

1818
package org.apache.spark.status
1919

20+
import java.io.File
2021
import java.util.{List => JList}
2122

2223
import scala.collection.JavaConverters._
2324
import scala.collection.mutable.HashMap
2425

2526
import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
27+
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
28+
import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
2629
import org.apache.spark.status.api.v1
2730
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
2831
import org.apache.spark.ui.scope._
2932
import org.apache.spark.util.Utils
30-
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
33+
import org.apache.spark.util.kvstore.KVStore
3134

3235
/**
3336
* A wrapper around a KVStore that provides methods for accessing the API data stored within.
@@ -769,7 +772,12 @@ private[spark] object AppStatusStore {
769772
def createLiveStore(
770773
conf: SparkConf,
771774
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
772-
val store = new ElementTrackingStore(new InMemoryStore(), conf)
775+
val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_))
776+
// For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now,
777+
// instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with
778+
// improvements on writes and reads.
779+
val kvStore = KVUtils.createKVStore(storePath, HybridStoreDiskBackend.ROCKSDB, conf)
780+
val store = new ElementTrackingStore(kvStore, conf)
773781
val listener = new AppStatusListener(store, conf, true, appStatusSource)
774782
new AppStatusStore(store, listener = Some(listener))
775783
}

core/src/main/scala/org/apache/spark/status/KVUtils.scala

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@
1818
package org.apache.spark.status
1919

2020
import java.io.File
21+
import java.nio.file.Files
2122

2223
import scala.annotation.meta.getter
2324
import scala.collection.JavaConverters._
2425
import scala.reflect.{classTag, ClassTag}
2526

2627
import com.fasterxml.jackson.annotation.JsonInclude
2728
import com.fasterxml.jackson.module.scala.DefaultScalaModule
29+
import org.fusesource.leveldbjni.internal.NativeDB
30+
import org.rocksdb.RocksDBException
2831

2932
import org.apache.spark.SparkConf
33+
import org.apache.spark.deploy.history.{FsHistoryProvider, FsHistoryProviderMetadata}
3034
import org.apache.spark.internal.Logging
35+
import org.apache.spark.internal.config.History
3136
import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
3237
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
3338
import org.apache.spark.internal.config.History.HybridStoreDiskBackend._
@@ -62,10 +67,14 @@ private[spark] object KVUtils extends Logging {
6267
* the store's metadata.
6368
* @param conf SparkConf use to get `HYBRID_STORE_DISK_BACKEND`
6469
*/
65-
def open[M: ClassTag](path: File, metadata: M, conf: SparkConf): KVStore = {
70+
def open[M: ClassTag](
71+
path: File,
72+
metadata: M,
73+
conf: SparkConf,
74+
diskBackend: Option[HybridStoreDiskBackend.Value] = None): KVStore = {
6675
require(metadata != null, "Metadata is required.")
6776

68-
val db = backend(conf) match {
77+
val db = diskBackend.getOrElse(backend(conf)) match {
6978
case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer())
7079
case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer())
7180
}
@@ -80,6 +89,43 @@ private[spark] object KVUtils extends Logging {
8089
db
8190
}
8291

92+
def createKVStore(
93+
storePath: Option[File],
94+
diskBackend: HybridStoreDiskBackend.Value,
95+
conf: SparkConf): KVStore = {
96+
storePath.map { path =>
97+
val dir = diskBackend match {
98+
case LEVELDB => "listing.ldb"
99+
case ROCKSDB => "listing.rdb"
100+
}
101+
102+
val dbPath = Files.createDirectories(new File(path, dir).toPath()).toFile()
103+
Utils.chmod700(dbPath)
104+
105+
val metadata = FsHistoryProviderMetadata(
106+
FsHistoryProvider.CURRENT_LISTING_VERSION,
107+
AppStatusStore.CURRENT_VERSION,
108+
conf.get(History.HISTORY_LOG_DIR))
109+
110+
try {
111+
open(dbPath, metadata, conf, Some(diskBackend))
112+
} catch {
113+
// If there's an error, remove the listing database and any existing UI database
114+
// from the store directory, since it's extremely likely that they'll all contain
115+
// incompatible information.
116+
case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
117+
logInfo("Detected incompatible DB versions, deleting...")
118+
path.listFiles().foreach(Utils.deleteRecursively)
119+
open(dbPath, metadata, conf, Some(diskBackend))
120+
case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
121+
// Get rid of the corrupted data and re-create it.
122+
logWarning(s"Failed to load disk store $dbPath :", dbExc)
123+
Utils.deleteRecursively(dbPath)
124+
open(dbPath, metadata, conf, Some(diskBackend))
125+
}
126+
}.getOrElse(new InMemoryStore())
127+
}
128+
83129
/** Turns a KVStoreView into a Scala sequence, applying a filter. */
84130
def viewToSeq[T](
85131
view: KVStoreView[T],

core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.util.Random
2222
import org.apache.spark.{SparkConf, SparkFunSuite}
2323
import org.apache.spark.executor.TaskMetrics
2424
import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend}
25-
import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
25+
import org.apache.spark.internal.config.Status.{LIVE_ENTITY_UPDATE_PERIOD, LIVE_UI_LOCAL_STORE_DIR}
2626
import org.apache.spark.resource.ResourceProfile
2727
import org.apache.spark.scheduler.{SparkListenerStageSubmitted, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality}
2828
import org.apache.spark.status.api.v1.SpeculationStageSummary
@@ -88,7 +88,19 @@ class AppStatusStoreSuite extends SparkFunSuite {
8888
live: Boolean): AppStatusStore = {
8989
val conf = new SparkConf()
9090
if (live) {
91-
return AppStatusStore.createLiveStore(conf)
91+
if (disk) {
92+
val testDir = Utils.createTempDir()
93+
conf.set(LIVE_UI_LOCAL_STORE_DIR, testDir.getCanonicalPath)
94+
}
95+
val liveStore = AppStatusStore.createLiveStore(conf)
96+
if (disk) {
97+
val rocksDBCreated = liveStore.store match {
98+
case e: ElementTrackingStore => !e.usingInMemoryStore
99+
case _ => false
100+
}
101+
assert(rocksDBCreated)
102+
}
103+
return liveStore
92104
}
93105

94106
val store: KVStore = if (disk) {
@@ -106,7 +118,8 @@ class AppStatusStoreSuite extends SparkFunSuite {
106118
val baseCases = Seq(
107119
"disk rocksdb" -> createAppStore(disk = true, HybridStoreDiskBackend.ROCKSDB, live = false),
108120
"in memory" -> createAppStore(disk = false, live = false),
109-
"in memory live" -> createAppStore(disk = false, live = true)
121+
"in memory live" -> createAppStore(disk = false, live = true),
122+
"rocksdb live" -> createAppStore(disk = true, HybridStoreDiskBackend.ROCKSDB, live = true)
110123
)
111124
if (Utils.isMacOnAppleSilicon) {
112125
baseCases

docs/configuration.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,6 +1379,15 @@ Apart from these, the following properties are also available, and may be useful
13791379
</td>
13801380
<td>1.1.1</td>
13811381
</tr>
1382+
<tr>
1383+
<td><code>spark.ui.store.path</code></td>
1384+
<td>None</td>
1385+
<td>
1386+
Local directory where to cache application information for live UI.
1387+
By default this is not set, meaning all application information will be kept in memory.
1388+
</td>
1389+
<td>3.4.0</td>
1390+
</tr>
13821391
<tr>
13831392
<td><code>spark.ui.killEnabled</code></td>
13841393
<td>true</td>

0 commit comments

Comments
 (0)