Skip to content
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.spark.executor

import java.lang.management.ManagementFactory
import java.util.concurrent.ThreadPoolExecutor
import javax.management.{MBeanServer, ObjectName}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -73,6 +75,29 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends
registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
}

/** Dropwizard metrics gauge measuring the executor's process CPU time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the comments should begin on the next line. But this is scaladoc syntax, and inside a code block, normally we just use // block comments.

* This code will try to get JVM Process CPU time or return -1 otherwise.
* The CPU time value is returned in nanoseconds.
* It will use proprietary extensions as com.sun.management.OperatingSystemMXBean or
* com.ibm.lang.management.OperatingSystemMXBean if available
*/
val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem here is that these become fields in the parent object. These should go inside the new Gauge... { I think?

val name = new ObjectName("java.lang", "type", "OperatingSystem")
metricRegistry.register(MetricRegistry.name("executorCPUTime" ), new Gauge[Long] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little confused with the exsiting cpuTime. How about jvmCpuTime?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: name("executorCPUTime" ) -> name("executorCPUTime")

override def getValue: Long = {
try {
val attribute = mBean.getAttribute(name, "ProcessCpuTime")
if (attribute != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed good point. I'll remove this additional check for null value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't mind the defensive checks, because who knows what to really expect from these implementations? but this is OK by me. In case of a bad impl this would still return -1.

attribute.asInstanceOf[Long]
} else {
-1L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to return -1 instead of 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the idea from com.sun.management.OperatingSystemMXBean.getProcessCpuTime, according to the documentation: "Returns: the CPU time used by the process in nanoseconds, or -1 if this operation is not supported."
I guess it makes sense to return an invalid value as -1L for the CPU time if something goes wrong with gathering CPU Time values, so the error condition will appear evident to the end user of the metric. Returning 0 is also possible, of course.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks.

}
} catch {
case _ : Exception => -1L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case NonFatal(_) => -1?

}
}
})

// Expose executor task metrics using the Dropwizard metrics system.
// The list is taken from TaskMetrics.scala
val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
Expand Down