Skip to content

Commit fdad095

Browse files
committed
[SPARK-45474][CORE][WEBUI] Support top-level filtering in MasterPage JSON API
### What changes were proposed in this pull request? This PR aims to support `top-level` filtering in `MasterPage` JSON API. ### Why are the changes needed? **BEFORE** Since Apache Spark's `MasterPage` JSON API always returns a full information, we need a post-processing like the following via `jq`. ``` $ curl -s -k http://localhost:8080/json/ | jq .completedapps [ { "id": "app-20231009193946-0000", "starttime": 1696905586694, "name": "Spark shell", "cores": 10, "user": "dongjoon", "memoryperexecutor": 1024, "memoryperslave": 1024, "resourcesperexecutor": [], "resourcesperslave": [], "submitdate": "Mon Oct 09 19:39:46 PDT 2023", "state": "FINISHED", "duration": 115686 } ] ``` **AFTER** Apache Spark `MasterPage` provided a filtered result via REST API style for the top-level fields. ``` $ curl -s -k http://localhost:8080/json/completedapps { "completedapps" : [ { "id" : "app-20231009193946-0000", "starttime" : 1696905586694, "name" : "Spark shell", "cores" : 10, "user" : "dongjoon", "memoryperexecutor" : 1024, "memoryperslave" : 1024, "resourcesperexecutor" : [ ], "resourcesperslave" : [ ], "submitdate" : "Mon Oct 09 19:39:46 PDT 2023", "state" : "FINISHED", "duration" : 115686 } ] } ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with a new test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#43303 from dongjoon-hyun/SPARK-45474. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
1 parent 0f9ed95 commit fdad095

3 files changed

Lines changed: 77 additions & 18 deletions

File tree

core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ private[deploy] object JsonProtocol {
188188
* Export the [[MasterStateResponse]] to a Json object. A [[MasterStateResponse]] consists the
189189
* information of a master node.
190190
*
191-
* @return a Json object containing the following fields:
191+
* @return a Json object containing the following fields if `field` is None:
192192
* `url` the url of the master node
193193
* `workers` a list of Json objects of [[WorkerInfo]] of the workers allocated to the
194194
* master
@@ -208,24 +208,63 @@ private[deploy] object JsonProtocol {
208208
* `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers
209209
* of the master
210210
* `status` status of the master,
211-
* see [[org.apache.spark.deploy.master.RecoveryState.MasterState]]
211+
* see [[org.apache.spark.deploy.master.RecoveryState.MasterState]].
212+
* If `field` is not None, the Json object will contain the matched field.
213+
* If `field` doesn't match, the Json object `(field -> "")` is returned.
212214
*/
213-
def writeMasterState(obj: MasterStateResponse): JObject = {
215+
def writeMasterState(obj: MasterStateResponse, field: Option[String] = None): JObject = {
214216
val aliveWorkers = obj.workers.filter(_.isAlive())
215-
("url" -> obj.uri) ~
216-
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
217-
("aliveworkers" -> aliveWorkers.length) ~
218-
("cores" -> aliveWorkers.map(_.cores).sum) ~
219-
("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~
220-
("memory" -> aliveWorkers.map(_.memory).sum) ~
221-
("memoryused" -> aliveWorkers.map(_.memoryUsed).sum) ~
222-
("resources" -> aliveWorkers.map(_.resourcesInfo).toList.map(writeResourcesInfo)) ~
223-
("resourcesused" -> aliveWorkers.map(_.resourcesInfoUsed).toList.map(writeResourcesInfo)) ~
224-
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
225-
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
226-
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
227-
("completeddrivers" -> obj.completedDrivers.toList.map(writeDriverInfo)) ~
228-
("status" -> obj.status.toString)
217+
field match {
218+
case None =>
219+
("url" -> obj.uri) ~
220+
("workers" -> obj.workers.toList.map (writeWorkerInfo) ) ~
221+
("aliveworkers" -> aliveWorkers.length) ~
222+
("cores" -> aliveWorkers.map (_.cores).sum) ~
223+
("coresused" -> aliveWorkers.map (_.coresUsed).sum) ~
224+
("memory" -> aliveWorkers.map (_.memory).sum) ~
225+
("memoryused" -> aliveWorkers.map (_.memoryUsed).sum) ~
226+
("resources" -> aliveWorkers.map (_.resourcesInfo).toList.map (writeResourcesInfo) ) ~
227+
("resourcesused" ->
228+
aliveWorkers.map (_.resourcesInfoUsed).toList.map (writeResourcesInfo) ) ~
229+
("activeapps" -> obj.activeApps.toList.map (writeApplicationInfo) ) ~
230+
("completedapps" -> obj.completedApps.toList.map (writeApplicationInfo) ) ~
231+
("activedrivers" -> obj.activeDrivers.toList.map (writeDriverInfo) ) ~
232+
("completeddrivers" -> obj.completedDrivers.toList.map (writeDriverInfo) ) ~
233+
("status" -> obj.status.toString)
234+
case Some(field) =>
235+
field match {
236+
case "url" =>
237+
("url" -> obj.uri)
238+
case "workers" =>
239+
("workers" -> obj.workers.toList.map (writeWorkerInfo) )
240+
case "aliveworkers" =>
241+
("aliveworkers" -> aliveWorkers.length)
242+
case "cores" =>
243+
("cores" -> aliveWorkers.map (_.cores).sum)
244+
case "coresused" =>
245+
("coresused" -> aliveWorkers.map (_.coresUsed).sum)
246+
case "memory" =>
247+
("memory" -> aliveWorkers.map (_.memory).sum)
248+
case "memoryused" =>
249+
("memoryused" -> aliveWorkers.map (_.memoryUsed).sum)
250+
case "resources" =>
251+
("resources" -> aliveWorkers.map (_.resourcesInfo).toList.map (writeResourcesInfo) )
252+
case "resourcesused" =>
253+
("resourcesused" ->
254+
aliveWorkers.map (_.resourcesInfoUsed).toList.map (writeResourcesInfo) )
255+
case "activeapps" =>
256+
("activeapps" -> obj.activeApps.toList.map (writeApplicationInfo) )
257+
case "completedapps" =>
258+
("completedapps" -> obj.completedApps.toList.map (writeApplicationInfo) )
259+
case "activedrivers" =>
260+
("activedrivers" -> obj.activeDrivers.toList.map (writeDriverInfo) )
261+
case "completeddrivers" =>
262+
("completeddrivers" -> obj.completedDrivers.toList.map (writeDriverInfo) )
263+
case "status" =>
264+
("status" -> obj.status.toString)
265+
case field => (field -> "")
266+
}
267+
}
229268
}
230269

231270
/**

core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,17 @@ import org.apache.spark.util.Utils
3232

3333
private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
3434
private val master = parent.masterEndpointRef
35+
private val jsonFieldPattern = "/json/([a-zA-Z]+).*".r
3536

3637
def getMasterState: MasterStateResponse = {
3738
master.askSync[MasterStateResponse](RequestMasterState)
3839
}
3940

4041
override def renderJson(request: HttpServletRequest): JValue = {
41-
JsonProtocol.writeMasterState(getMasterState)
42+
jsonFieldPattern.findFirstMatchIn(request.getRequestURI()) match {
43+
case None => JsonProtocol.writeMasterState(getMasterState)
44+
case Some(m) => JsonProtocol.writeMasterState(getMasterState, Some(m.group(1)))
45+
}
4246
}
4347

4448
def handleAppKillRequest(request: HttpServletRequest): Unit = {

core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,22 @@ class JsonProtocolSuite extends SparkFunSuite with JsonTestUtils {
7676
assertValidDataInJson(output, JsonMethods.parse(JsonConstants.masterStateJsonStr))
7777
}
7878

79+
test("SPARK-45474: filtered writeMasterState") {
80+
val workers = Array(createWorkerInfo(), createWorkerInfo())
81+
val activeApps = Array(createAppInfo())
82+
val completedApps = Array.empty[ApplicationInfo]
83+
val activeDrivers = Array(createDriverInfo())
84+
val completedDrivers = Array(createDriverInfo())
85+
val stateResponse = new MasterStateResponse(
86+
"host", 8080, None, workers, activeApps, completedApps,
87+
activeDrivers, completedDrivers, RecoveryState.ALIVE)
88+
val output = JsonProtocol.writeMasterState(stateResponse, Some("activedrivers"))
89+
assertValidJson(output)
90+
91+
val expected = """{"activedrivers":[%s]}""".format(JsonConstants.driverInfoJsonStr).stripMargin
92+
assertValidDataInJson(output, JsonMethods.parse(expected))
93+
}
94+
7995
test("writeWorkerState") {
8096
val executors = List[ExecutorRunner]()
8197
val finishedExecutors = List[ExecutorRunner](createExecutorRunner(123, true),

0 commit comments

Comments
 (0)