Skip to content

Commit 6441f06

Browse files
committed
Re-use same instance for empty metrics UI data objects.
1 parent 37d9522 commit 6441f06

1 file changed

Lines changed: 60 additions & 19 deletions

File tree

core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala

Lines changed: 60 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.collection.mutable
2121
import scala.collection.mutable.{HashMap, LinkedHashMap}
2222

2323
import org.apache.spark.JobExecutionStatus
24-
import org.apache.spark.executor.{ShuffleReadMetrics, ShuffleWriteMetrics, TaskMetrics}
24+
import org.apache.spark.executor._
2525
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
2626
import org.apache.spark.util.AccumulatorContext
2727
import org.apache.spark.util.collection.OpenHashSet
@@ -147,9 +147,8 @@ private[spark] object UIData {
147147
memoryBytesSpilled = m.memoryBytesSpilled,
148148
diskBytesSpilled = m.diskBytesSpilled,
149149
peakExecutionMemory = m.peakExecutionMemory,
150-
inputMetrics = InputMetricsUIData(m.inputMetrics.bytesRead, m.inputMetrics.recordsRead),
151-
outputMetrics =
152-
OutputMetricsUIData(m.outputMetrics.bytesWritten, m.outputMetrics.recordsWritten),
150+
inputMetrics = InputMetricsUIData(m.inputMetrics),
151+
outputMetrics = OutputMetricsUIData(m.outputMetrics),
153152
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
154153
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
155154
}
@@ -197,8 +196,32 @@ private[spark] object UIData {
197196
shuffleWriteMetrics: ShuffleWriteMetricsUIData)
198197

199198
case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
199+
object InputMetricsUIData {
200+
def apply(metrics: InputMetrics): InputMetricsUIData = {
201+
if (metrics.bytesRead == 0 && metrics.recordsRead == 0) {
202+
EMPTY
203+
} else {
204+
new InputMetricsUIData(
205+
bytesRead = metrics.bytesRead,
206+
recordsRead = metrics.recordsRead)
207+
}
208+
}
209+
val EMPTY = InputMetricsUIData(0, 0)
210+
}
200211

201212
case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
213+
object OutputMetricsUIData {
214+
def apply(metrics: OutputMetrics): OutputMetricsUIData = {
215+
if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) {
216+
EMPTY
217+
} else {
218+
new OutputMetricsUIData(
219+
bytesWritten = metrics.bytesWritten,
220+
recordsWritten = metrics.recordsWritten)
221+
}
222+
}
223+
val EMPTY = OutputMetricsUIData(0, 0)
224+
}
202225

203226
case class ShuffleReadMetricsUIData(
204227
remoteBlocksFetched: Long,
@@ -212,17 +235,30 @@ private[spark] object UIData {
212235

213236
object ShuffleReadMetricsUIData {
214237
def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = {
215-
new ShuffleReadMetricsUIData(
216-
remoteBlocksFetched = metrics.remoteBlocksFetched,
217-
localBlocksFetched = metrics.localBlocksFetched,
218-
remoteBytesRead = metrics.remoteBytesRead,
219-
localBytesRead = metrics.localBytesRead,
220-
fetchWaitTime = metrics.fetchWaitTime,
221-
recordsRead = metrics.recordsRead,
222-
totalBytesRead = metrics.totalBytesRead,
223-
totalBlocksFetched = metrics.totalBlocksFetched
224-
)
238+
if (
239+
metrics.remoteBlocksFetched == 0 &&
240+
metrics.localBlocksFetched == 0 &&
241+
metrics.remoteBytesRead == 0 &&
242+
metrics.localBytesRead == 0 &&
243+
metrics.fetchWaitTime == 0 &&
244+
metrics.recordsRead == 0 &&
245+
metrics.totalBytesRead == 0 &&
246+
metrics.totalBlocksFetched == 0) {
247+
EMPTY
248+
} else {
249+
new ShuffleReadMetricsUIData(
250+
remoteBlocksFetched = metrics.remoteBlocksFetched,
251+
localBlocksFetched = metrics.localBlocksFetched,
252+
remoteBytesRead = metrics.remoteBytesRead,
253+
localBytesRead = metrics.localBytesRead,
254+
fetchWaitTime = metrics.fetchWaitTime,
255+
recordsRead = metrics.recordsRead,
256+
totalBytesRead = metrics.totalBytesRead,
257+
totalBlocksFetched = metrics.totalBlocksFetched
258+
)
259+
}
225260
}
261+
val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0)
226262
}
227263

228264
case class ShuffleWriteMetricsUIData(
@@ -232,12 +268,17 @@ private[spark] object UIData {
232268

233269
object ShuffleWriteMetricsUIData {
234270
def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = {
235-
new ShuffleWriteMetricsUIData(
236-
bytesWritten = metrics.bytesWritten,
237-
recordsWritten = metrics.recordsWritten,
238-
writeTime = metrics.writeTime
239-
)
271+
if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0 && metrics.writeTime == 0) {
272+
EMPTY
273+
} else {
274+
new ShuffleWriteMetricsUIData(
275+
bytesWritten = metrics.bytesWritten,
276+
recordsWritten = metrics.recordsWritten,
277+
writeTime = metrics.writeTime
278+
)
279+
}
240280
}
281+
val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0)
241282
}
242283

243284
}

0 commit comments

Comments
 (0)