Skip to content
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.executor

import java.lang.management.ManagementFactory
import java.util.concurrent.ThreadPoolExecutor
import javax.management.{MBeanServer, ObjectName}

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import com.codahale.metrics.{Gauge, MetricRegistry}
import org.apache.hadoop.fs.FileSystem
Expand Down Expand Up @@ -73,6 +76,28 @@ class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends
registerFileSystemStat(scheme, "write_ops", _.getWriteOps(), 0)
}

// Dropwizard metrics gauge measuring the executor's process CPU time.
// This Gauge will try to get and return the JVM Process CPU time or return -1 otherwise.
// The CPU time value is returned in nanoseconds.
// It will use proprietary extensions such as com.sun.management.OperatingSystemMXBean or
// com.ibm.lang.management.OperatingSystemMXBean, if available.
metricRegistry.register(MetricRegistry.name("jvmCpuTime"), new Gauge[Long] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this isn't exposed except through dropwizard... not plumbed through to the driver too like some of the metrics below? just checking that this is all that needs to happen, that the metric can be used by external users but is not otherwise touched by Spark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this is exposed only through dropwizard metrics system and not used otherwise in the Spark code. Another point worth mentioning is that currently executorSource is not registered when running in local mode.
On a related topic (although maybe for a more general discussion than the scope of this PR) I was wondering if it would make sense to introduce a few SparkConf properties to switch on/off certain families of (dropwizard) metrics in the Spark, as the list of available metrics is mecoming long in recent versions.

val mBean: MBeanServer = ManagementFactory.getPlatformMBeanServer
val name = new ObjectName("java.lang", "type", "OperatingSystem")
override def getValue: Long = {
try {
val attribute = mBean.getAttribute(name, "ProcessCpuTime")
if (attribute != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed good point. I'll remove this additional check for null value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't mind the defensive checks, because who knows what to really expect from these implementations? but this is OK by me. In case of a bad impl this would still return -1.

attribute.asInstanceOf[Long]
} else {
-1L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to return -1 instead of 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the idea from com.sun.management.OperatingSystemMXBean.getProcessCpuTime, according to the documentation: "Returns: the CPU time used by the process in nanoseconds, or -1 if this operation is not supported."
I guess it makes sense to return an invalid value as -1L for the CPU time if something goes wrong with gathering CPU Time values, so the error condition will appear evident to the end user of the metric. Returning 0 is also possible, of course.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, thanks.

}
} catch {
case NonFatal(_) => -1L
}
}
})

// Expose executor task metrics using the Dropwizard metrics system.
// The list is taken from TaskMetrics.scala
val METRIC_CPU_TIME = metricRegistry.counter(MetricRegistry.name("cpuTime"))
Expand Down