Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 137 additions & 21 deletions core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,74 +21,190 @@ import org.json4s.JsonAST.JObject
import org.json4s.JsonDSL._

import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.master._
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner

private[deploy] object JsonProtocol {
def writeWorkerInfo(obj: WorkerInfo): JObject = {
("id" -> obj.id) ~
("host" -> obj.host) ~
("port" -> obj.port) ~
("webuiaddress" -> obj.webUiAddress) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
("coresfree" -> obj.coresFree) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("memoryfree" -> obj.memoryFree) ~
("state" -> obj.state.toString) ~
("lastheartbeat" -> obj.lastHeartbeat)
}
/**
* Export the [[WorkerInfo]] to a Json object. A [[WorkerInfo]] consists of the information of a
* worker.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the worker
* `host` the host that the worker is running on
* `port` the port that the worker is bound to
* `webuiaddress` the address used in web UI
* `cores` total cores of the worker
* `coresused` allocated cores of the worker
* `coresfree` free cores of the worker
* `memory` total memory of the worker
* `memoryused` allocated memory of the worker
* `memoryfree` free memory of the worker
* `state` state of the worker, see [[WorkerState]]
* `lastheartbeat` time in milliseconds that the latest heart beat message from the
* worker is received
*/
def writeWorkerInfo(obj: WorkerInfo): JObject = {
("id" -> obj.id) ~
("host" -> obj.host) ~
("port" -> obj.port) ~
("webuiaddress" -> obj.webUiAddress) ~
("cores" -> obj.cores) ~
("coresused" -> obj.coresUsed) ~
("coresfree" -> obj.coresFree) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("memoryfree" -> obj.memoryFree) ~
("state" -> obj.state.toString) ~
("lastheartbeat" -> obj.lastHeartbeat)
}

/**
* Export the [[ApplicationInfo]] to a Json objec. An [[ApplicationInfo]] consists of the
* information of an application.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the application
* `starttime` time in milliseconds that the application starts
Copy link
Member

@gatorsmile gatorsmile Jun 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch the order between id and starttime?

* `name` the description of the application
* `cores` total cores granted to the application
* `user` name of the user who submitted the application
* `memoryperslave` minimal memory in MB required to each executor
* `submitdate` time in Date that the application is submitted
* `state` state of the application, see [[ApplicationState]]
* `duration` time in milliseconds that the application has been running
*/
def writeApplicationInfo(obj: ApplicationInfo): JObject = {
("starttime" -> obj.startTime) ~
("id" -> obj.id) ~
("starttime" -> obj.startTime) ~
("name" -> obj.desc.name) ~
("cores" -> obj.desc.maxCores) ~
("cores" -> obj.coresGranted) ~
("user" -> obj.desc.user) ~
("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
("submitdate" -> obj.submitDate.toString) ~
("state" -> obj.state.toString) ~
("duration" -> obj.duration)
}

/**
* Export the [[ApplicationDescription]] to a Json object. An [[ApplicationDescription]] consists
* of the description of an application.
*
* @return a Json object containing the following fields:
* `name` the description of the application
* `cores` max cores that can be allocated to the application, 0 means unlimited
* `memoryperslave` minimal memory in MB required to each executor
* `user` name of the user who submitted the application
* `command` the command string used to submit the application
*/
def writeApplicationDescription(obj: ApplicationDescription): JObject = {
("name" -> obj.name) ~
("cores" -> obj.maxCores) ~
("cores" -> obj.maxCores.getOrElse(0)) ~
("memoryperslave" -> obj.memoryPerExecutorMB) ~
("user" -> obj.user) ~
("command" -> obj.command.toString)
}

/**
* Export the [[ExecutorRunner]] to a Json object. An [[ExecutorRunner]] consists of the
* information of an executor.
*
* @return a Json object containing the following fields:
* `id` an integer identifier of the executor
* `memory` memory in MB allocated to the executor
* `appid` a string identifier of the application that the executor is working on
* `appdesc` a Json object of the [[ApplicationDescription]] of the application that the
* executor is working on
*/
def writeExecutorRunner(obj: ExecutorRunner): JObject = {
("id" -> obj.execId) ~
("memory" -> obj.memory) ~
("appid" -> obj.appId) ~
("appdesc" -> writeApplicationDescription(obj.appDesc))
}

/**
* Export the [[DriverInfo]] to a Json object. A [[DriverInfo]] consists of the information of a
* driver.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the driver
* `starttime` time in milliseconds that the driver starts
* `state` state of the driver, see [[DriverState]]
* `cores` cores allocated to the driver
* `memory` memory in MB allocated to the driver
* `submitdate` time in Date that the driver is created
* `worker` identifier of the worker that the driver is running on
* `mainclass` main class of the command string that started the driver
*/
def writeDriverInfo(obj: DriverInfo): JObject = {
("id" -> obj.id) ~
("starttime" -> obj.startTime.toString) ~
("state" -> obj.state.toString) ~
("cores" -> obj.desc.cores) ~
("memory" -> obj.desc.mem)
("memory" -> obj.desc.mem) ~
("submitdate" -> obj.submitDate.toString) ~
("worker" -> obj.worker.map(_.id).getOrElse("None")) ~
("mainclass" -> obj.desc.command.arguments(2))
}

/**
* Export the [[MasterStateResponse]] to a Json object. A [[MasterStateResponse]] consists the
* information of a master node.
*
* @return a Json object containing the following fields:
* `url` the url of the master node
* `workers` a list of Json objects of [[WorkerInfo]] of the workers allocated to the
* master
* `aliveworkers` size of alive workers allocated to the master
* `cores` total cores available of the master
* `coresused` cores used by the master
* `memory` total memory available of the master
* `memoryused` memory used by the master
* `activeapps` a list of Json objects of [[ApplicationInfo]] of the active applications
* running on the master
* `completedapps` a list of Json objects of [[ApplicationInfo]] of the applications
* completed in the master
* `activedrivers` a list of Json objects of [[DriverInfo]] of the active drivers of the
* master
* `completeddrivers` a list of Json objects of [[DriverInfo]] of the completed drivers
* of the master
* `status` status of the master, see [[MasterState]]
*/
def writeMasterState(obj: MasterStateResponse): JObject = {
val aliveWorkers = obj.workers.filter(_.isAlive())
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("aliveworkers" -> aliveWorkers.length) ~
("cores" -> aliveWorkers.map(_.cores).sum) ~
("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~
("memory" -> aliveWorkers.map(_.memory).sum) ~
("memoryused" -> aliveWorkers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
("completeddrivers" -> obj.completedDrivers.toList.map(writeDriverInfo)) ~
("status" -> obj.status.toString)
}

/**
* Export the [[WorkerStateResponse]] to a Json object. A [[WorkerStateResponse]] consists the
* information of a worker node.
*
* @return a Json object containing the following fields:
* `id` a string identifier of the worker node
* `masterurl` url of the master node of the worker
* `masterwebuiurl` the address used in web UI of the master node of the worker
* `cores` total cores of the worker
* `coreused` used cores of the worker
* `memory` total memory of the worker
* `memoryused` used memory of the worker
* `executors` a list of Json objects of [[ExecutorRunner]] of the executors running on
* the worker
* `finishedexecutors` a list of Json objects of [[ExecutorRunner]] of the finished
* executors of the worker
*/
def writeWorkerState(obj: WorkerStateResponse): JObject = {
("id" -> obj.workerId) ~
("masterurl" -> obj.masterUrl) ~
Expand All @@ -97,7 +213,7 @@ private[deploy] object JsonProtocol {
("coresused" -> obj.coresUsed) ~
("memory" -> obj.memory) ~
("memoryused" -> obj.memoryUsed) ~
("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~
("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner))
("executors" -> obj.executors.map(writeExecutorRunner)) ~
("finishedexecutors" -> obj.finishedExecutors.map(writeExecutorRunner))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ private[deploy] object DeployTestUtils {
}

def createDriverCommand(): Command = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
"org.apache.spark.FakeClass", Seq("WORKER_URL", "USER_JAR", "mainClass"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Seq("-Dfoo")
)

def createDriverDesc(): DriverDescription =
new DriverDescription("hdfs://some-dir/some.jar", 100, 3, false, createDriverCommand())

def createDriverInfo(): DriverInfo = new DriverInfo(3, "driver-3",
createDriverDesc(), new Date())
createDriverDesc(), JsonConstants.submitDate)

def createWorkerInfo(): WorkerInfo = {
val workerInfo = new WorkerInfo("id", "host", 8080, 4, 1234, null, "http://publicAddress:80")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ object JsonConstants {
val submitDate = new Date(123456789)
val appInfoJsonStr =
"""
|{"starttime":3,"id":"id","name":"name",
|"cores":4,"user":"%s",
|{"id":"id","starttime":3,"name":"name",
|"cores":0,"user":"%s",
|"memoryperslave":1234,"submitdate":"%s",
|"state":"WAITING","duration":%d}
""".format(System.getProperty("user.name", "<unknown>"),
Expand Down Expand Up @@ -134,19 +134,24 @@ object JsonConstants {

val driverInfoJsonStr =
"""
|{"id":"driver-3","starttime":"3","state":"SUBMITTED","cores":3,"memory":100}
""".stripMargin
|{"id":"driver-3","starttime":"3",
|"state":"SUBMITTED","cores":3,"memory":100,
|"submitdate":"%s","worker":"None",
|"mainclass":"mainClass"}
""".format(submitDate.toString).stripMargin

val masterStateJsonStr =
"""
|{"url":"spark://host:8080",
|"workers":[%s,%s],
|"aliveworkers":2,
|"cores":8,"coresused":0,"memory":2468,"memoryused":0,
|"activeapps":[%s],"completedapps":[],
|"activedrivers":[%s],
|"completeddrivers":[%s],
|"status":"ALIVE"}
""".format(workerInfoJsonStr, workerInfoJsonStr,
appInfoJsonStr, driverInfoJsonStr).stripMargin
appInfoJsonStr, driverInfoJsonStr, driverInfoJsonStr).stripMargin

val workerStateJsonStr =
"""
Expand Down