Skip to content

Commit 78532fd

Browse files
committed
[SPARK-41244][UI] Introducing a Protobuf serializer for UI data on KV store
### What changes were proposed in this pull request? Introducing Protobuf serializer for KV store, which is 3 times as fast as the default serializer according to end-to-end benchmark against RocksDB. | Serializer | Avg Write time(μs) | Avg Read time(μs) | RocksDB File Total Size(MB) | Result total size in memory(MB) | |----------------------------------|--------------------|-------------------|-----------------------------|---------------------------------| | Spark’s KV Serializer(JSON+gzip) | 352.2 | 119.26 | 837 | 868 | | Protobuf | 109.9 | 34.3 | 858 | 2105 | To move fast and make PR review easier, this PR will: * Cover the class `JobDataWrapper` only. We can handle more UI data later. * Not adding configuration for setting serializer in SHS. We will have it as a follow-up. ### Why are the changes needed? A faster serializer for KV store. It supports schema evolution so that in the future SHS can leverage it as well. More details in the SPIP: https://docs.google.com/document/d/1cuKnFwlTodyVhUQPMuakq2YDaLH05jaY9FRu_aD1zMo/edit ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Closes #38779 from gengliangwang/protobuf. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 889c08b commit 78532fd

13 files changed

Lines changed: 398 additions & 19 deletions

File tree

common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public KVStoreSerializer() {
4949
this.mapper = new ObjectMapper();
5050
}
5151

52-
public final byte[] serialize(Object o) throws Exception {
52+
public byte[] serialize(Object o) throws Exception {
5353
if (o instanceof String) {
5454
return ((String) o).getBytes(UTF_8);
5555
} else {
@@ -62,7 +62,7 @@ public final byte[] serialize(Object o) throws Exception {
6262
}
6363

6464
@SuppressWarnings("unchecked")
65-
public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
65+
public <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
6666
if (klass.equals(String.class)) {
6767
return (T) new String(data, UTF_8);
6868
} else {

connector/protobuf/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@
122122
<plugin>
123123
<groupId>com.github.os72</groupId>
124124
<artifactId>protoc-jar-maven-plugin</artifactId>
125-
<version>3.11.4</version>
125+
<version>${protoc-jar-maven-plugin.version}</version>
126126
<!-- Generates Java classes for tests. TODO(Raghu): Generate descriptor files too. -->
127127
<executions>
128128
<execution>

core/pom.xml

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,12 @@
532532
<groupId>org.apache.commons</groupId>
533533
<artifactId>commons-crypto</artifactId>
534534
</dependency>
535-
535+
<dependency>
536+
<groupId>com.google.protobuf</groupId>
537+
<artifactId>protobuf-java</artifactId>
538+
<version>${protobuf.version}</version>
539+
<scope>compile</scope>
540+
</dependency>
536541
</dependencies>
537542
<build>
538543
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
@@ -616,6 +621,48 @@
616621
</execution>
617622
</executions>
618623
</plugin>
624+
<plugin>
625+
<groupId>org.apache.maven.plugins</groupId>
626+
<artifactId>maven-shade-plugin</artifactId>
627+
<configuration>
628+
<shadedArtifactAttached>false</shadedArtifactAttached>
629+
<shadeTestJar>true</shadeTestJar>
630+
<artifactSet>
631+
<includes>
632+
<include>com.google.protobuf:*</include>
633+
</includes>
634+
</artifactSet>
635+
<relocations>
636+
<relocation>
637+
<pattern>com.google.protobuf</pattern>
638+
<shadedPattern>${spark.shade.packageName}.spark-core.protobuf</shadedPattern>
639+
<includes>
640+
<include>com.google.protobuf.**</include>
641+
</includes>
642+
</relocation>
643+
</relocations>
644+
</configuration>
645+
</plugin>
646+
<plugin>
647+
<groupId>com.github.os72</groupId>
648+
<artifactId>protoc-jar-maven-plugin</artifactId>
649+
<version>${protoc-jar-maven-plugin.version}</version>
650+
<executions>
651+
<execution>
652+
<phase>generate-sources</phase>
653+
<goals>
654+
<goal>run</goal>
655+
</goals>
656+
<configuration>
657+
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
658+
<protocVersion>${protobuf.version}</protocVersion>
659+
<inputDirectories>
660+
<include>src/main/protobuf</include>
661+
</inputDirectories>
662+
</configuration>
663+
</execution>
664+
</executions>
665+
</plugin>
619666
</plugins>
620667
</build>
621668

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
syntax = "proto3";
19+
package org.apache.spark.status.protobuf;
20+
21+
enum JobExecutionStatus {
22+
UNSPECIFIED = 0;
23+
RUNNING = 1;
24+
SUCCEEDED = 2;
25+
FAILED = 3;
26+
UNKNOWN = 4;
27+
}
28+
29+
message JobData {
30+
// All IDs are int64 for extendability, even when they are currently int32 in Spark.
31+
int64 job_id = 1;
32+
string name = 2;
33+
optional string description = 3;
34+
optional int64 submission_time = 4;
35+
optional int64 completion_time = 5;
36+
repeated int64 stage_ids = 6;
37+
optional string job_group = 7;
38+
JobExecutionStatus status = 8;
39+
int32 num_tasks = 9;
40+
int32 num_active_tasks = 10;
41+
int32 num_completed_tasks = 11;
42+
int32 num_skipped_tasks = 12;
43+
int32 num_failed_tasks = 13;
44+
int32 num_killed_tasks = 14;
45+
int32 num_completed_indices = 15;
46+
int32 num_active_stages = 16;
47+
int32 num_completed_stages = 17;
48+
int32 num_skipped_stages = 18;
49+
int32 num_failed_stages = 19;
50+
map<string, int32> kill_tasks_summary = 20;
51+
}
52+
53+
message JobDataWrapper {
54+
JobData info = 1;
55+
repeated int32 skipped_stages = 2;
56+
optional int64 sql_execution_id = 3;
57+
}

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
133133

134134
// Visible for testing.
135135
private[history] val listing: KVStore = {
136-
KVUtils.createKVStore(storePath, hybridStoreDiskBackend, conf)
136+
KVUtils.createKVStore(storePath, live = false, conf)
137137
}
138138

139139
private val diskManager = storePath.map { path =>

core/src/main/scala/org/apache/spark/status/AppStatusStore.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
2424
import scala.collection.mutable.HashMap
2525

2626
import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
27-
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
2827
import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
2928
import org.apache.spark.status.api.v1
3029
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
@@ -773,10 +772,7 @@ private[spark] object AppStatusStore {
773772
conf: SparkConf,
774773
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
775774
val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_))
776-
// For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now,
777-
// instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with
778-
// improvements on writes and reads.
779-
val kvStore = KVUtils.createKVStore(storePath, HybridStoreDiskBackend.ROCKSDB, conf)
775+
val kvStore = KVUtils.createKVStore(storePath, live = true, conf)
780776
val store = new ElementTrackingStore(kvStore, conf)
781777
val listener = new AppStatusListener(store, conf, true, appStatusSource)
782778
new AppStatusStore(store, listener = Some(listener))

core/src/main/scala/org/apache/spark/status/KVUtils.scala

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.spark.internal.config.History
3636
import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
3737
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
3838
import org.apache.spark.internal.config.History.HybridStoreDiskBackend._
39+
import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
3940
import org.apache.spark.util.Utils
4041
import org.apache.spark.util.kvstore._
4142

@@ -71,12 +72,14 @@ private[spark] object KVUtils extends Logging {
7172
path: File,
7273
metadata: M,
7374
conf: SparkConf,
74-
diskBackend: Option[HybridStoreDiskBackend.Value] = None): KVStore = {
75+
diskBackend: Option[HybridStoreDiskBackend.Value] = None,
76+
serializer: Option[KVStoreSerializer] = None): KVStore = {
7577
require(metadata != null, "Metadata is required.")
7678

79+
val kvSerializer = serializer.getOrElse(new KVStoreScalaSerializer())
7780
val db = diskBackend.getOrElse(backend(conf)) match {
78-
case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer())
79-
case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer())
81+
case LEVELDB => new LevelDB(path, kvSerializer)
82+
case ROCKSDB => new RocksDB(path, kvSerializer)
8083
}
8184
val dbMeta = db.getMetadata(classTag[M].runtimeClass)
8285
if (dbMeta == null) {
@@ -91,9 +94,26 @@ private[spark] object KVUtils extends Logging {
9194

9295
def createKVStore(
9396
storePath: Option[File],
94-
diskBackend: HybridStoreDiskBackend.Value,
97+
live: Boolean,
9598
conf: SparkConf): KVStore = {
9699
storePath.map { path =>
100+
val diskBackend = if (live) {
101+
// For the disk-based KV store of live UI, let's simply make it ROCKSDB only for now,
102+
// instead of supporting both LevelDB and RocksDB. RocksDB is built based on LevelDB with
103+
// improvements on writes and reads.
104+
HybridStoreDiskBackend.ROCKSDB
105+
} else {
106+
HybridStoreDiskBackend.withName(conf.get(History.HYBRID_STORE_DISK_BACKEND))
107+
}
108+
109+
val serializer = if (live) {
110+
// For the disk-based KV store of live UI, let's simply use protobuf serializer only.
111+
// The default serializer is slow since it is using JSON+GZip encoding.
112+
Some(new KVStoreProtobufSerializer())
113+
} else {
114+
None
115+
}
116+
97117
val dir = diskBackend match {
98118
case LEVELDB => "listing.ldb"
99119
case ROCKSDB => "listing.rdb"
@@ -108,20 +128,20 @@ private[spark] object KVUtils extends Logging {
108128
conf.get(History.HISTORY_LOG_DIR))
109129

110130
try {
111-
open(dbPath, metadata, conf, Some(diskBackend))
131+
open(dbPath, metadata, conf, Some(diskBackend), serializer)
112132
} catch {
113133
// If there's an error, remove the listing database and any existing UI database
114134
// from the store directory, since it's extremely likely that they'll all contain
115135
// incompatible information.
116136
case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
117137
logInfo("Detected incompatible DB versions, deleting...")
118138
path.listFiles().foreach(Utils.deleteRecursively)
119-
open(dbPath, metadata, conf, Some(diskBackend))
139+
open(dbPath, metadata, conf, Some(diskBackend), serializer)
120140
case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
121141
// Get rid of the corrupted data and re-create it.
122142
logWarning(s"Failed to load disk store $dbPath :", dbExc)
123143
Utils.deleteRecursively(dbPath)
124-
open(dbPath, metadata, conf, Some(diskBackend))
144+
open(dbPath, metadata, conf, Some(diskBackend), serializer)
125145
}
126146
}.getOrElse(new InMemoryStore())
127147
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.protobuf
19+
20+
import collection.JavaConverters._
21+
import java.util.Date
22+
23+
import org.apache.spark.JobExecutionStatus
24+
import org.apache.spark.status.JobDataWrapper
25+
import org.apache.spark.status.api.v1.JobData
26+
27+
object JobDataWrapperSerializer {
28+
def serialize(j: JobDataWrapper): Array[Byte] = {
29+
val jobData = serializeJobData(j.info)
30+
val builder = StoreTypes.JobDataWrapper.newBuilder()
31+
builder.setInfo(jobData)
32+
j.skippedStages.foreach(builder.addSkippedStages)
33+
j.sqlExecutionId.foreach(builder.setSqlExecutionId)
34+
builder.build().toByteArray
35+
}
36+
37+
def deserialize(bytes: Array[Byte]): JobDataWrapper = {
38+
val wrapper = StoreTypes.JobDataWrapper.parseFrom(bytes)
39+
val sqlExecutionId = getOptional(wrapper.hasSqlExecutionId, wrapper.getSqlExecutionId)
40+
new JobDataWrapper(
41+
deserializeJobData(wrapper.getInfo),
42+
wrapper.getSkippedStagesList.asScala.map(_.toInt).toSet,
43+
sqlExecutionId
44+
)
45+
}
46+
47+
private def getOptional[T](condition: Boolean, result: () => T): Option[T] = if (condition) {
48+
Some(result())
49+
} else {
50+
None
51+
}
52+
53+
private def serializeJobData(jobData: JobData): StoreTypes.JobData = {
54+
val jobDataBuilder = StoreTypes.JobData.newBuilder()
55+
jobDataBuilder.setJobId(jobData.jobId.toLong)
56+
.setName(jobData.name)
57+
.setStatus(serializeJobExecutionStatus(jobData.status))
58+
.setNumTasks(jobData.numTasks)
59+
.setNumActiveTasks(jobData.numActiveTasks)
60+
.setNumCompletedTasks(jobData.numCompletedTasks)
61+
.setNumSkippedTasks(jobData.numSkippedTasks)
62+
.setNumFailedTasks(jobData.numFailedTasks)
63+
.setNumKilledTasks(jobData.numKilledTasks)
64+
.setNumCompletedIndices(jobData.numCompletedIndices)
65+
.setNumActiveStages(jobData.numActiveStages)
66+
.setNumCompletedStages(jobData.numCompletedStages)
67+
.setNumSkippedStages(jobData.numSkippedStages)
68+
.setNumFailedStages(jobData.numFailedStages)
69+
70+
jobData.description.foreach(jobDataBuilder.setDescription)
71+
jobData.submissionTime.foreach { d =>
72+
jobDataBuilder.setSubmissionTime(d.getTime)
73+
}
74+
jobData.completionTime.foreach { d =>
75+
jobDataBuilder.setCompletionTime(d.getTime)
76+
}
77+
jobData.stageIds.foreach(id => jobDataBuilder.addStageIds(id.toLong))
78+
jobData.jobGroup.foreach(jobDataBuilder.setJobGroup)
79+
jobData.killedTasksSummary.foreach { entry =>
80+
jobDataBuilder.putKillTasksSummary(entry._1, entry._2)
81+
}
82+
jobDataBuilder.build()
83+
}
84+
85+
private def deserializeJobData(info: StoreTypes.JobData): JobData = {
86+
val description = getOptional(info.hasDescription, info.getDescription)
87+
val submissionTime =
88+
getOptional(info.hasSubmissionTime, () => new Date(info.getSubmissionTime))
89+
val completionTime = getOptional(info.hasCompletionTime, () => new Date(info.getCompletionTime))
90+
val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup)
91+
val status = JobExecutionStatus.valueOf(info.getStatus.toString)
92+
93+
new JobData(
94+
jobId = info.getJobId.toInt,
95+
name = info.getName,
96+
description = description,
97+
submissionTime = submissionTime,
98+
completionTime = completionTime,
99+
stageIds = info.getStageIdsList.asScala.map(_.toInt).toSeq,
100+
jobGroup = jobGroup,
101+
status = status,
102+
numTasks = info.getNumTasks,
103+
numActiveTasks = info.getNumActiveTasks,
104+
numCompletedTasks = info.getNumCompletedTasks,
105+
numSkippedTasks = info.getNumSkippedTasks,
106+
numFailedTasks = info.getNumFailedTasks,
107+
numKilledTasks = info.getNumKilledTasks,
108+
numCompletedIndices = info.getNumCompletedIndices,
109+
numActiveStages = info.getNumActiveStages,
110+
numCompletedStages = info.getNumCompletedStages,
111+
numSkippedStages = info.getNumSkippedStages,
112+
numFailedStages = info.getNumFailedStages,
113+
killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
114+
}
115+
116+
private def serializeJobExecutionStatus(j: JobExecutionStatus): StoreTypes.JobExecutionStatus = {
117+
StoreTypes.JobExecutionStatus.valueOf(j.toString)
118+
}
119+
}

0 commit comments

Comments
 (0)