Skip to content

Commit a28f08d

Browse files
techaddictgengliangwang
authored andcommitted
[SPARK-41432][UI][SQL] Protobuf serializer for SparkPlanGraphWrapper
### What changes were proposed in this pull request? Add Protobuf serializer for SparkPlanGraphWrapper ### Why are the changes needed? Support fast and compact serialization/deserialization for SparkPlanGraphWrapper over RocksDB. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UT Closes #39164 from techaddict/SPARK-41432-SparkPlanGraphWrapper. Authored-by: Sandeep Singh <sandeep@techaddict.me> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 8cceb39 commit a28f08d

4 files changed

Lines changed: 293 additions & 1 deletion

File tree

core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,3 +390,34 @@ message SQLExecutionUIData {
390390
repeated int64 stages = 11;
391391
map<int64, string> metric_values = 12;
392392
}
393+
394+
message SparkPlanGraphNode {
395+
int64 id = 1;
396+
string name = 2;
397+
string desc = 3;
398+
repeated SQLPlanMetric metrics = 4;
399+
}
400+
401+
message SparkPlanGraphClusterWrapper {
402+
int64 id = 1;
403+
string name = 2;
404+
string desc = 3;
405+
repeated SparkPlanGraphNodeWrapper nodes = 4;
406+
repeated SQLPlanMetric metrics = 5;
407+
}
408+
409+
message SparkPlanGraphNodeWrapper {
410+
SparkPlanGraphNode node = 1;
411+
SparkPlanGraphClusterWrapper cluster = 2;
412+
}
413+
414+
message SparkPlanGraphEdge {
415+
int64 from_id = 1;
416+
int64 to_id = 2;
417+
}
418+
419+
message SparkPlanGraphWrapper {
420+
int64 execution_id = 1;
421+
repeated SparkPlanGraphNodeWrapper nodes = 2;
422+
repeated SparkPlanGraphEdge edges = 3;
423+
}

sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@
1616
#
1717

1818
org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
19+
org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.status.protobuf.sql
19+
20+
import collection.JavaConverters._
21+
22+
import org.apache.spark.sql.execution.ui.{SparkPlanGraphClusterWrapper, SparkPlanGraphEdge, SparkPlanGraphNode, SparkPlanGraphNodeWrapper, SparkPlanGraphWrapper}
23+
import org.apache.spark.status.protobuf.ProtobufSerDe
24+
import org.apache.spark.status.protobuf.StoreTypes
25+
26+
class SparkPlanGraphWrapperSerializer extends ProtobufSerDe {
27+
28+
override val supportClass: Class[_] = classOf[SparkPlanGraphWrapper]
29+
30+
override def serialize(input: Any): Array[Byte] = {
31+
val plan = input.asInstanceOf[SparkPlanGraphWrapper]
32+
val builder = StoreTypes.SparkPlanGraphWrapper.newBuilder()
33+
builder.setExecutionId(plan.executionId)
34+
plan.nodes.foreach { node =>
35+
builder.addNodes(serializeSparkPlanGraphNodeWrapper(node))
36+
}
37+
plan.edges.foreach {edge =>
38+
builder.addEdges(serializeSparkPlanGraphEdge(edge))
39+
}
40+
builder.build().toByteArray
41+
}
42+
43+
def deserialize(bytes: Array[Byte]): SparkPlanGraphWrapper = {
44+
val wrapper = StoreTypes.SparkPlanGraphWrapper.parseFrom(bytes)
45+
new SparkPlanGraphWrapper(
46+
executionId = wrapper.getExecutionId,
47+
nodes = wrapper.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq,
48+
edges = wrapper.getEdgesList.asScala.map(deserializeSparkPlanGraphEdge).toSeq
49+
)
50+
}
51+
52+
private def serializeSparkPlanGraphNodeWrapper(input: SparkPlanGraphNodeWrapper):
53+
StoreTypes.SparkPlanGraphNodeWrapper = {
54+
55+
val builder = StoreTypes.SparkPlanGraphNodeWrapper.newBuilder()
56+
builder.setNode(serializeSparkPlanGraphNode(input.node))
57+
builder.setCluster(serializeSparkPlanGraphClusterWrapper(input.cluster))
58+
builder.build()
59+
}
60+
61+
private def deserializeSparkPlanGraphNodeWrapper(input: StoreTypes.SparkPlanGraphNodeWrapper):
62+
SparkPlanGraphNodeWrapper = {
63+
64+
new SparkPlanGraphNodeWrapper(
65+
node = deserializeSparkPlanGraphNode(input.getNode),
66+
cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster)
67+
)
68+
}
69+
70+
private def serializeSparkPlanGraphEdge(edge: SparkPlanGraphEdge):
71+
StoreTypes.SparkPlanGraphEdge = {
72+
val builder = StoreTypes.SparkPlanGraphEdge.newBuilder()
73+
builder.setFromId(edge.fromId)
74+
builder.setToId(edge.toId)
75+
builder.build()
76+
}
77+
78+
private def deserializeSparkPlanGraphEdge(edge: StoreTypes.SparkPlanGraphEdge):
79+
SparkPlanGraphEdge = {
80+
SparkPlanGraphEdge(
81+
fromId = edge.getFromId,
82+
toId = edge.getToId)
83+
}
84+
85+
private def serializeSparkPlanGraphNode(node: SparkPlanGraphNode):
86+
StoreTypes.SparkPlanGraphNode = {
87+
val builder = StoreTypes.SparkPlanGraphNode.newBuilder()
88+
builder.setId(node.id)
89+
builder.setName(node.name)
90+
builder.setDesc(node.desc)
91+
node.metrics.foreach { metric =>
92+
builder.addMetrics(SQLPlanMetricSerializer.serialize(metric))
93+
}
94+
builder.build()
95+
}
96+
97+
private def deserializeSparkPlanGraphNode(node: StoreTypes.SparkPlanGraphNode):
98+
SparkPlanGraphNode = {
99+
100+
new SparkPlanGraphNode(
101+
id = node.getId,
102+
name = node.getName,
103+
desc = node.getDesc,
104+
metrics = node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq
105+
)
106+
}
107+
108+
private def serializeSparkPlanGraphClusterWrapper(cluster: SparkPlanGraphClusterWrapper):
109+
StoreTypes.SparkPlanGraphClusterWrapper = {
110+
val builder = StoreTypes.SparkPlanGraphClusterWrapper.newBuilder()
111+
builder.setId(cluster.id)
112+
builder.setName(cluster.name)
113+
builder.setDesc(cluster.desc)
114+
cluster.nodes.foreach { node =>
115+
builder.addNodes(serializeSparkPlanGraphNodeWrapper(node))
116+
}
117+
cluster.metrics.foreach { metric =>
118+
builder.addMetrics(SQLPlanMetricSerializer.serialize(metric))
119+
}
120+
builder.build()
121+
}
122+
123+
private def deserializeSparkPlanGraphClusterWrapper(
124+
cluster: StoreTypes.SparkPlanGraphClusterWrapper): SparkPlanGraphClusterWrapper = {
125+
126+
new SparkPlanGraphClusterWrapper(
127+
id = cluster.getId,
128+
name = cluster.getName,
129+
desc = cluster.getDesc,
130+
nodes = cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq,
131+
metrics = cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq
132+
)
133+
}
134+
}

sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.spark.status.protobuf.sql
1919

2020
import org.apache.spark.SparkFunSuite
21-
import org.apache.spark.sql.execution.ui.SQLExecutionUIData
21+
import org.apache.spark.sql.execution.ui._
2222
import org.apache.spark.status.api.v1.sql.SqlResourceSuite
2323
import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
2424

@@ -85,4 +85,130 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
8585
// input.metricValues is null, result.metricValues is also empty map.
8686
assert(result2.metricValues.isEmpty)
8787
}
88+
89+
test("Spark Plan Graph") {
90+
val cluster = new SparkPlanGraphClusterWrapper(
91+
id = 5,
92+
name = "name_5",
93+
desc = "desc_5",
94+
nodes = Seq(new SparkPlanGraphNodeWrapper(
95+
node = new SparkPlanGraphNode(
96+
id = 12,
97+
name = "name_12",
98+
desc = "desc_12",
99+
metrics = Seq(
100+
SQLPlanMetric(
101+
name = "name_13",
102+
accumulatorId = 13,
103+
metricType = "metric_13"
104+
),
105+
SQLPlanMetric(
106+
name = "name_14",
107+
accumulatorId = 14,
108+
metricType = "metric_14"
109+
)
110+
)
111+
),
112+
cluster = new SparkPlanGraphClusterWrapper(
113+
id = 15,
114+
name = "name_15",
115+
desc = "desc_15",
116+
nodes = Seq(),
117+
metrics = Seq(
118+
SQLPlanMetric(
119+
name = "name_16",
120+
accumulatorId = 16,
121+
metricType = "metric_16"
122+
),
123+
SQLPlanMetric(
124+
name = "name_17",
125+
accumulatorId = 17,
126+
metricType = "metric_17"
127+
)
128+
)
129+
)
130+
)),
131+
metrics = Seq(
132+
SQLPlanMetric(
133+
name = "name_6",
134+
accumulatorId = 6,
135+
metricType = "metric_6"
136+
),
137+
SQLPlanMetric(
138+
name = "name_7 d",
139+
accumulatorId = 7,
140+
metricType = "metric_7"
141+
)
142+
)
143+
)
144+
val node = new SparkPlanGraphNodeWrapper(
145+
node = new SparkPlanGraphNode(
146+
id = 2,
147+
name = "name_1",
148+
desc = "desc_1",
149+
metrics = Seq(
150+
SQLPlanMetric(
151+
name = "name_2",
152+
accumulatorId = 3,
153+
metricType = "metric_1"
154+
),
155+
SQLPlanMetric(
156+
name = "name_3",
157+
accumulatorId = 4,
158+
metricType = "metric_2"
159+
)
160+
)
161+
),
162+
cluster = cluster
163+
)
164+
val input = new SparkPlanGraphWrapper(
165+
executionId = 1,
166+
nodes = Seq(node),
167+
edges = Seq(
168+
SparkPlanGraphEdge(8, 9),
169+
SparkPlanGraphEdge(10, 11)
170+
)
171+
)
172+
173+
val bytes = serializer.serialize(input)
174+
val result = serializer.deserialize(bytes, classOf[SparkPlanGraphWrapper])
175+
assert(result.executionId == input.executionId)
176+
assert(result.nodes.size == input.nodes.size)
177+
178+
def compareNodes(n1: SparkPlanGraphNodeWrapper, n2: SparkPlanGraphNodeWrapper): Unit = {
179+
assert(n1.node.id == n2.node.id)
180+
assert(n1.node.name == n2.node.name)
181+
assert(n1.node.desc == n2.node.desc)
182+
183+
assert(n1.node.metrics.size == n2.node.metrics.size)
184+
n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) =>
185+
assert(m1.name == m2.name)
186+
assert(m1.accumulatorId == m2.accumulatorId)
187+
assert(m1.metricType == m2.metricType)
188+
}
189+
190+
assert(n1.cluster.id == n2.cluster.id)
191+
assert(n1.cluster.name == n2.cluster.name)
192+
assert(n1.cluster.desc == n2.cluster.desc)
193+
assert(n1.cluster.nodes.size == n2.cluster.nodes.size)
194+
n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) =>
195+
compareNodes(n3, n4)
196+
}
197+
n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) =>
198+
assert(m1.name == m2.name)
199+
assert(m1.accumulatorId == m2.accumulatorId)
200+
assert(m1.metricType == m2.metricType)
201+
}
202+
}
203+
204+
result.nodes.zip(input.nodes).foreach { case (n1, n2) =>
205+
compareNodes(n1, n2)
206+
}
207+
208+
assert(result.edges.size == input.edges.size)
209+
result.edges.zip(input.edges).foreach { case (e1, e2) =>
210+
assert(e1.fromId == e2.fromId)
211+
assert(e1.toId == e2.toId)
212+
}
213+
}
88214
}

0 commit comments

Comments
 (0)