Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.SparkException
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.status.api.v1.StageStatus._
import org.apache.spark.status.api.v1.TaskSorting._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.{SparkUI, UIUtils}
import org.apache.spark.ui.jobs.ApiHelper._

@Produces(Array(MediaType.APPLICATION_JSON))
Expand Down Expand Up @@ -189,32 +189,41 @@ private[v1] class StagesResource extends BaseAppResource {
val taskMetricsContainsValue = (task: TaskData) => task.taskMetrics match {
case None => false
case Some(metrics) =>
(containsValue(task.taskMetrics.get.executorDeserializeTime)
|| containsValue(task.taskMetrics.get.executorRunTime)
|| containsValue(task.taskMetrics.get.jvmGcTime)
|| containsValue(task.taskMetrics.get.resultSerializationTime)
|| containsValue(task.taskMetrics.get.memoryBytesSpilled)
|| containsValue(task.taskMetrics.get.diskBytesSpilled)
|| containsValue(task.taskMetrics.get.peakExecutionMemory)
|| containsValue(task.taskMetrics.get.inputMetrics.bytesRead)
(containsValue(UIUtils.formatDuration(task.taskMetrics.get.executorDeserializeTime))
|| containsValue(UIUtils.formatDuration(task.taskMetrics.get.executorRunTime))
|| containsValue(UIUtils.formatDuration(task.taskMetrics.get.jvmGcTime))
|| containsValue(UIUtils.formatDuration(task.taskMetrics.get.resultSerializationTime))
|| containsValue(UIUtils.formatBytes(task.taskMetrics.get.memoryBytesSpilled))
|| containsValue(UIUtils.formatBytes(task.taskMetrics.get.diskBytesSpilled))
|| containsValue(UIUtils.formatBytes(task.taskMetrics.get.peakExecutionMemory))
|| containsValue(UIUtils.formatBytes(task.taskMetrics.get.inputMetrics.bytesRead))
|| containsValue(task.taskMetrics.get.inputMetrics.recordsRead)
|| containsValue(task.taskMetrics.get.outputMetrics.bytesWritten)
|| containsValue(UIUtils.formatBytes(task.taskMetrics.get.outputMetrics.bytesWritten))
|| containsValue(task.taskMetrics.get.outputMetrics.recordsWritten)
|| containsValue(task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime)
|| containsValue(UIUtils.formatDuration(
task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime))
|| containsValue(UIUtils.formatBytes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is also adding new fields to search as well, no big deal, but should probably add to the description of the pr.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead))
|| containsValue(UIUtils.formatBytes(
task.taskMetrics.get.shuffleReadMetrics.localBytesRead +
task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead))
|| containsValue(task.taskMetrics.get.shuffleReadMetrics.recordsRead)
|| containsValue(task.taskMetrics.get.shuffleWriteMetrics.bytesWritten)
|| containsValue(UIUtils.formatBytes(
task.taskMetrics.get.shuffleWriteMetrics.bytesWritten))
|| containsValue(task.taskMetrics.get.shuffleWriteMetrics.recordsWritten)
|| containsValue(task.taskMetrics.get.shuffleWriteMetrics.writeTime))
|| containsValue(UIUtils.formatDuration(
task.taskMetrics.get.shuffleWriteMetrics.writeTime / 1000000)))
}
val filteredTaskDataSequence: Seq[TaskData] = taskDataList.filter(f =>
(containsValue(f.taskId) || containsValue(f.index) || containsValue(f.attempt)
|| containsValue(f.launchTime)
|| containsValue(UIUtils.formatDate(f.launchTime))
|| containsValue(f.resultFetchStart.getOrElse(defaultOptionString))
|| containsValue(f.executorId) || containsValue(f.host) || containsValue(f.status)
|| containsValue(f.taskLocality) || containsValue(f.speculative)
|| containsValue(f.errorMessage.getOrElse(defaultOptionString))
|| taskMetricsContainsValue(f)
|| containsValue(f.schedulerDelay) || containsValue(f.gettingResultTime)))
|| containsValue(UIUtils.formatDuration(f.schedulerDelay))
|| containsValue(UIUtils.formatDuration(f.gettingResultTime))))
filteredTaskDataSequence
}

Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ private[spark] object UIUtils extends Logging {
}
}

def formatBytes(bytes: Long): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, there is Utils.bytesToString already, but it is using decimal units. In this case it makes sense to format in binary units, as you are trying to match what is shown in the UI for the search. You should probably add a comment above to say it's purpose. How about changing the name to formatBytesBinary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, have updated the code. Thank you.

if (bytes == 0) {
return "0.0 B"
}
val factor = 1024
val sizes = Array("B", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB")
val num = (Math.floor(Math.log(bytes.toDouble) / Math.log(factor.toDouble))).toInt
"%.1f".format((bytes / Math.pow(factor, num)).toFloat) + " " + sizes(num)
}

// Yarn has to go through a proxy so the base uri is provided and has to be on all links
def uiRoot(request: HttpServletRequest): String = {
// Knox uses X-Forwarded-Context to notify the application the base path
Expand Down