From 1d28d879572aa958b169acc5e1a48e52cced4c26 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 26 Nov 2018 10:56:18 -0800 Subject: [PATCH] ShuffleReadMetricsReporter --- .../spark/executor/ShuffleReadMetrics.scala | 4 +-- .../shuffle/BlockStoreShuffleReader.scala | 2 +- .../apache/spark/shuffle/ShuffleManager.scala | 2 +- .../shuffle/ShuffleMetricsReporter.scala | 33 ------------------- .../shuffle/sort/SortShuffleManager.scala | 2 +- .../storage/ShuffleBlockFetcherIterator.scala | 4 +-- 6 files changed, 7 insertions(+), 40 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/shuffle/ShuffleMetricsReporter.scala diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala index 2f97e969d2dd2..12c4b8f67f71c 100644 --- a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -18,7 +18,7 @@ package org.apache.spark.executor import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.shuffle.ShuffleMetricsReporter +import org.apache.spark.shuffle.ShuffleReadMetricsReporter import org.apache.spark.util.LongAccumulator @@ -130,7 +130,7 @@ class ShuffleReadMetrics private[spark] () extends Serializable { * shuffle dependency, and all temporary metrics will be merged into the [[ShuffleReadMetrics]] at * last. */ -private[spark] class TempShuffleReadMetrics extends ShuffleMetricsReporter { +private[spark] class TempShuffleReadMetrics extends ShuffleReadMetricsReporter { private[this] var _remoteBlocksFetched = 0L private[this] var _localBlocksFetched = 0L private[this] var _remoteBytesRead = 0L diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 7cb031ce318b7..27e2f98c58f0c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -33,7 +33,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( startPartition: Int, endPartition: Int, context: TaskContext, - readMetrics: ShuffleMetricsReporter, + readMetrics: ShuffleReadMetricsReporter, serializerManager: SerializerManager = SparkEnv.get.serializerManager, blockManager: BlockManager = SparkEnv.get.blockManager, mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index d1061d83cb85a..df601cbdb2050 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -49,7 +49,7 @@ private[spark] trait ShuffleManager { startPartition: Int, endPartition: Int, context: TaskContext, - metrics: ShuffleMetricsReporter): ShuffleReader[K, C] + metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] /** * Remove a shuffle's metadata from the ShuffleManager. diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMetricsReporter.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMetricsReporter.scala deleted file mode 100644 index 32865149c97c2..0000000000000 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMetricsReporter.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.shuffle - -/** - * An interface for reporting shuffle information, for each shuffle. This interface assumes - * all the methods are called on a single-threaded, i.e. concrete implementations would not need - * to synchronize anything. - */ -private[spark] trait ShuffleMetricsReporter { - def incRemoteBlocksFetched(v: Long): Unit - def incLocalBlocksFetched(v: Long): Unit - def incRemoteBytesRead(v: Long): Unit - def incRemoteBytesReadToDisk(v: Long): Unit - def incLocalBytesRead(v: Long): Unit - def incFetchWaitTime(v: Long): Unit - def incRecordsRead(v: Long): Unit -} diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 57c3150e5a697..4f8be198e4a72 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -115,7 +115,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager startPartition: Int, endPartition: Int, context: TaskContext, - metrics: ShuffleMetricsReporter): ShuffleReader[K, C] = { + metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context, metrics) diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index a2e0713e70b04..86f7c08eddcb5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle._ import org.apache.spark.network.util.TransportConf -import org.apache.spark.shuffle.{FetchFailedException, ShuffleMetricsReporter} +import org.apache.spark.shuffle.{FetchFailedException, ShuffleReadMetricsReporter} import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBufferOutputStream @@ -73,7 +73,7 @@ final class ShuffleBlockFetcherIterator( maxBlocksInFlightPerAddress: Int, maxReqSizeShuffleToMem: Long, detectCorrupt: Boolean, - shuffleMetrics: ShuffleMetricsReporter) + shuffleMetrics: ShuffleReadMetricsReporter) extends Iterator[(BlockId, InputStream)] with DownloadFileManager with Logging { import ShuffleBlockFetcherIterator._