From 86050a26efbf1759db3d45cbdd73e42ddc186a5d Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 12 Mar 2019 18:03:33 -0700 Subject: [PATCH 01/12] Begin shuffle locations work --- .../api/shuffle/MapShuffleLocations.java | 39 ++++++++++ .../spark/api/shuffle/ShuffleLocation.java | 25 ++++++ .../sort/BypassMergeSortShuffleWriter.java | 7 +- .../sort/DefaultMapShuffleLocations.java | 76 +++++++++++++++++++ .../shuffle/sort/UnsafeShuffleWriter.java | 4 +- .../apache/spark/scheduler/MapStatus.scala | 72 ++++++++++++++---- .../shuffle/sort/SortShuffleWriter.scala | 3 +- .../apache/spark/MapOutputTrackerSuite.scala | 6 +- .../spark/scheduler/MapStatusSuite.scala | 14 ++-- 9 files changed, 218 insertions(+), 28 deletions(-) create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java create mode 100644 core/src/main/java/org/apache/spark/api/shuffle/ShuffleLocation.java create mode 100644 core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java diff --git a/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java b/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java new file mode 100644 index 0000000000000..cb31e57d95566 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.api.shuffle; + +import org.apache.spark.annotation.Experimental; + +import java.io.Serializable; + +/** + * Represents metadata about where shuffle blocks were written in a single map task. + *

+ * This is optionally returned by shuffle writers. The inner shuffle locations may + * be accessed by shuffle readers. Shuffle locations are only necessary when the + * location of shuffle blocks needs to be managed by the driver; shuffle plugins + * may choose to use an external database or other metadata management systems to + * track the locations of shuffle blocks instead. + */ +@Experimental +public interface MapShuffleLocations extends Serializable { + + /** + * Get the location for a given shuffle block written by this map task. + */ + ShuffleLocation getLocationForBlock(int reduceId); +} diff --git a/core/src/main/java/org/apache/spark/api/shuffle/ShuffleLocation.java b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleLocation.java new file mode 100644 index 0000000000000..87eb497098e0c --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/shuffle/ShuffleLocation.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.shuffle; + +/** + * Marker interface representing a location of a shuffle block. Implementations of shuffle readers + * and writers are expected to cast this down to an implementation-specific representation. + */ +public interface ShuffleLocation { +} diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 32b446785a9f0..d6745160d506b 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -124,7 +124,8 @@ public void write(Iterator> records) throws IOException { if (!records.hasNext()) { partitionLengths = new long[numPartitions]; shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); - mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); + mapStatus = MapStatus$.MODULE$.apply( + DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), partitionLengths); return; } final SerializerInstance serInstance = serializer.newInstance(); @@ -166,7 +167,9 @@ public void write(Iterator> records) throws IOException { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); } } - mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); + mapStatus = MapStatus$.MODULE$.apply( + DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), + partitionLengths); } @VisibleForTesting diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java b/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java new file mode 100644 index 0000000000000..4e22f13e4a0a5 --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; + +import org.apache.spark.api.shuffle.MapShuffleLocations; +import org.apache.spark.api.shuffle.ShuffleLocation; +import org.apache.spark.storage.BlockManagerId; + +import java.util.Objects; + +public class DefaultMapShuffleLocations implements MapShuffleLocations, ShuffleLocation { + + /** + * We borrow the cache size from the BlockManagerId's cache - around 1MB, which should be + * feasible. + */ + private static final LoadingCache + DEFAULT_SHUFFLE_LOCATIONS_CACHE = + CacheBuilder.newBuilder() + .maximumSize(10000) + .build(new CacheLoader() { + @Override + public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) { + return new DefaultMapShuffleLocations(blockManagerId); + } + }); + + private final BlockManagerId location; + + public DefaultMapShuffleLocations(BlockManagerId blockManagerId) { + this.location = blockManagerId; + } + + public static DefaultMapShuffleLocations get(BlockManagerId blockManagerId) { + return DEFAULT_SHUFFLE_LOCATIONS_CACHE.getUnchecked(blockManagerId); + } + + @Override + public ShuffleLocation getLocationForBlock(int reduceId) { + return this; + } + + public BlockManagerId getBlockManagerId() { + return location; + } + + @Override + public boolean equals(Object other) { + return other instanceof DefaultMapShuffleLocations + && Objects.equals(((DefaultMapShuffleLocations) other).location, location); + } + + @Override + public int hashCode() { + return Objects.hashCode(location); + } +} diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index 36081069b0e75..cfed8549ecfc1 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -249,7 +249,9 @@ void closeAndWriteOutput() throws IOException { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); } } - mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); + mapStatus = MapStatus$.MODULE$.apply( + DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), + partitionLengths); } @VisibleForTesting diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 64f0a060a247c..50b4a8f79c9ec 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,12 +19,13 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} -import scala.collection.mutable - import org.roaringbitmap.RoaringBitmap +import scala.collection.mutable import org.apache.spark.SparkEnv +import org.apache.spark.api.shuffle.MapShuffleLocations import org.apache.spark.internal.config +import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils @@ -33,8 +34,13 @@ import org.apache.spark.util.Utils * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. */ private[spark] sealed trait MapStatus { - /** Location where this task was run. */ - def location: BlockManagerId + /** + * Locations where this task stored shuffle blocks. + * + * May be null if the MapOutputTracker is not tracking the location of shuffle blocks, leaving it + * up to the implementation of shuffle plugins to do so. + */ + def location: MapShuffleLocations /** * Estimated size for the reduce block, in bytes. @@ -56,7 +62,15 @@ private[spark] object MapStatus { .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS)) .getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get) - def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { + // A temporary concession to the fact that we only expect implementations of shuffle provided by + // Spark to be storing shuffle locations in the driver, meaning we want to introduce as little + // serialization overhead as possible in such default cases. + // + // If more similar cases arise, consider adding a serialization API for these shuffle locations. + private val DEFAULT_MAP_SHUFFLE_LOCATIONS_ID: Byte = 0 + private val NON_DEFAULT_MAP_SHUFFLE_LOCATIONS_ID: Byte = 1 + + def apply(loc: MapShuffleLocations, uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) { HighlyCompressedMapStatus(loc, uncompressedSizes) } else { @@ -91,6 +105,34 @@ private[spark] object MapStatus { math.pow(LOG_BASE, compressedSize & 0xFF).toLong } } + + def writeShuffleLocations(loc: MapShuffleLocations, out: ObjectOutput): Unit = { + if (loc != null) { + out.writeBoolean(true) + loc match { + case DefaultMapShuffleLocations => + out.writeByte(MapStatus.DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) + loc.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.writeExternal(out) + case _ => + out.writeByte(MapStatus.NON_DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) + out.writeObject(loc) + } + } else { + out.writeBoolean(false) + } + } + + def readShuffleLocations(in: ObjectInput): MapShuffleLocations = { + if (in.readBoolean()) { + val locId = in.readByte() + if (locId == MapStatus.DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) { + val blockManagerId = BlockManagerId(in) + DefaultMapShuffleLocations.get(blockManagerId) + } else { + in.readObject().asInstanceOf[MapShuffleLocations] + } + } else null + } } @@ -102,30 +144,30 @@ private[spark] object MapStatus { * @param compressedSizes size of the blocks, indexed by reduce partition id. */ private[spark] class CompressedMapStatus( - private[this] var loc: BlockManagerId, + private[this] var loc: MapShuffleLocations, private[this] var compressedSizes: Array[Byte]) extends MapStatus with Externalizable { protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only - def this(loc: BlockManagerId, uncompressedSizes: Array[Long]) { + def this(loc: MapShuffleLocations, uncompressedSizes: Array[Long]) { this(loc, uncompressedSizes.map(MapStatus.compressSize)) } - override def location: BlockManagerId = loc + override def location: MapShuffleLocations = loc override def getSizeForBlock(reduceId: Int): Long = { MapStatus.decompressSize(compressedSizes(reduceId)) } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - loc.writeExternal(out) + MapStatus.writeShuffleLocations(loc, out) out.writeInt(compressedSizes.length) out.write(compressedSizes) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - loc = BlockManagerId(in) + loc = MapStatus.readShuffleLocations(in) val len = in.readInt() compressedSizes = new Array[Byte](len) in.readFully(compressedSizes) @@ -144,7 +186,7 @@ private[spark] class CompressedMapStatus( * @param hugeBlockSizes sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( - private[this] var loc: BlockManagerId, + private[this] var loc: MapShuffleLocations, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, @@ -157,7 +199,7 @@ private[spark] class HighlyCompressedMapStatus private ( protected def this() = this(null, -1, null, -1, null) // For deserialization only - override def location: BlockManagerId = loc + override def location: MapShuffleLocations = loc override def getSizeForBlock(reduceId: Int): Long = { assert(hugeBlockSizes != null) @@ -172,7 +214,7 @@ private[spark] class HighlyCompressedMapStatus private ( } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - loc.writeExternal(out) + MapStatus.writeShuffleLocations(loc, out) emptyBlocks.writeExternal(out) out.writeLong(avgSize) out.writeInt(hugeBlockSizes.size) @@ -183,7 +225,7 @@ private[spark] class HighlyCompressedMapStatus private ( } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - loc = BlockManagerId(in) + loc = MapStatus.readShuffleLocations(in) emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() @@ -199,7 +241,7 @@ private[spark] class HighlyCompressedMapStatus private ( } private[spark] object HighlyCompressedMapStatus { - def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { + def apply(loc: MapShuffleLocations, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { // We must keep track of which blocks are empty so that we don't report a zero-sized // block as being non-empty (or vice-versa) when using the average block size. var i = 0 diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 16058de8bf3ff..9f6620c985625 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -70,7 +70,8 @@ private[spark] class SortShuffleWriter[K, V, C]( val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) val partitionLengths = sorter.writePartitionedFile(blockId, tmp) shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) - mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) + mapStatus = MapStatus( + DefaultMapShuffleLocations.get(blockManager.shuffleServerId), partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index 26a0fb0657af2..c2455d536ee09 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -18,10 +18,8 @@ package org.apache.spark import scala.collection.mutable.ArrayBuffer - import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ - import org.apache.spark.LocalSparkContext._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.config._ @@ -29,6 +27,7 @@ import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, RPC_MESSAGE_MA import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv} import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus} import org.apache.spark.shuffle.FetchFailedException +import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.{BlockManagerId, ShuffleBlockId} class MapOutputTrackerSuite extends SparkFunSuite { @@ -262,7 +261,8 @@ class MapOutputTrackerSuite extends SparkFunSuite { masterTracker.registerShuffle(20, 100) (0 until 100).foreach { i => masterTracker.registerMapOutput(20, i, new CompressedMapStatus( - BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0))) + DefaultMapShuffleLocations.get(BlockManagerId("999", "mps", 1000)), + Array.fill[Long](4000000)(0))) } val senderAddress = RpcAddress("localhost", 12345) val rpcCallContext = mock(classOf[RpcCallContext]) diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index c1e7fb9a1db16..9a10bef411fce 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -19,15 +19,15 @@ package org.apache.spark.scheduler import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} -import scala.util.Random - import org.mockito.Mockito.mock import org.roaringbitmap.RoaringBitmap +import scala.util.Random import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} import org.apache.spark.LocalSparkContext._ import org.apache.spark.internal.config import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.BlockManagerId class MapStatusSuite extends SparkFunSuite { @@ -61,7 +61,7 @@ class MapStatusSuite extends SparkFunSuite { stddev <- Seq(0.0, 0.01, 0.5, 1.0) ) { val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean) - val status = MapStatus(BlockManagerId("a", "b", 10), sizes) + val status = MapStatus(DefaultMapShuffleLocations.get(BlockManagerId("a", "b", 10)), sizes) val status1 = compressAndDecompressMapStatus(status) for (i <- 0 until numSizes) { if (sizes(i) != 0) { @@ -86,7 +86,7 @@ class MapStatusSuite extends SparkFunSuite { test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") { val sizes = Array.tabulate[Long](3000) { i => i.toLong } val avg = sizes.sum / sizes.count(_ != 0) - val loc = BlockManagerId("a", "b", 10) + val loc = DefaultMapShuffleLocations.get(BlockManagerId("a", "b", 10)) val status = MapStatus(loc, sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) @@ -108,7 +108,7 @@ class MapStatusSuite extends SparkFunSuite { val sizes = (0L to 3000L).toArray val smallBlockSizes = sizes.filter(n => n > 0 && n < threshold) val avg = smallBlockSizes.sum / smallBlockSizes.length - val loc = BlockManagerId("a", "b", 10) + val loc = DefaultMapShuffleLocations.get(BlockManagerId("a", "b", 10)) val status = MapStatus(loc, sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) @@ -165,7 +165,9 @@ class MapStatusSuite extends SparkFunSuite { SparkEnv.set(env) // Value of element in sizes is equal to the corresponding index. val sizes = (0L to 2000L).toArray - val status1 = MapStatus(BlockManagerId("exec-0", "host-0", 100), sizes) + val status1 = MapStatus( + DefaultMapShuffleLocations.get( + BlockManagerId("exec-0", "host-0", 100)), sizes) val arrayStream = new ByteArrayOutputStream(102400) val objectOutputStream = new ObjectOutputStream(arrayStream) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) From b58b6282cf63d9c9c92a6029f204ef167801d82b Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 15 Mar 2019 14:52:20 -0700 Subject: [PATCH 02/12] [WIP] Store pluggable shuffle locations in the map output tracker. --- .../sort/BypassMergeSortShuffleWriter.java | 5 +- .../shuffle/sort/UnsafeShuffleWriter.java | 1 + .../org/apache/spark/MapOutputTracker.scala | 18 +-- .../apache/spark/scheduler/MapStatus.scala | 118 ++++++++++++------ .../spark/serializer/KryoSerializer.scala | 9 +- .../shuffle/BlockStoreShuffleReader.scala | 11 +- .../shuffle/sort/SortShuffleWriter.scala | 4 +- .../apache/spark/MapOutputTrackerSuite.scala | 22 ++-- .../spark/scheduler/DAGSchedulerSuite.scala | 58 +++++---- .../spark/scheduler/MapStatusSuite.scala | 29 +++-- .../serializer/KryoSerializerSuite.scala | 14 ++- .../BlockStoreShuffleReaderSuite.scala | 6 +- 12 files changed, 195 insertions(+), 100 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index d6745160d506b..4ad3eafe78999 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -125,7 +125,9 @@ public void write(Iterator> records) throws IOException { partitionLengths = new long[numPartitions]; shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null); mapStatus = MapStatus$.MODULE$.apply( - DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), partitionLengths); + blockManager.shuffleServerId(), + DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), + partitionLengths); return; } final SerializerInstance serInstance = serializer.newInstance(); @@ -168,6 +170,7 @@ public void write(Iterator> records) throws IOException { } } mapStatus = MapStatus$.MODULE$.apply( + blockManager.shuffleServerId(), DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), partitionLengths); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index cfed8549ecfc1..e5bc60c687f37 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -250,6 +250,7 @@ void closeAndWriteOutput() throws IOException { } } mapStatus = MapStatus$.MODULE$.apply( + blockManager.shuffleServerId(), DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), partitionLengths); } diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 1d4b1ef9c9a1c..05496690a3d8c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -28,6 +28,7 @@ import scala.concurrent.duration.Duration import scala.reflect.ClassTag import scala.util.control.NonFatal +import org.apache.spark.api.shuffle.MapShuffleLocations import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -282,7 +283,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) } @@ -296,7 +297,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * describing the shuffle blocks that are stored at that block manager. */ def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] + : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] /** * Deletes map output status information for the specified shuffle stage. @@ -646,7 +647,7 @@ private[spark] class MapOutputTrackerMaster( // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. // This method is only called in local-mode. def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => @@ -683,11 +684,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) - : Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) try { - MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses) + MapOutputTracker.convertMapStatuses( + shuffleId, startPartition, endPartition, statuses) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: @@ -871,9 +873,9 @@ private[spark] object MapOutputTracker extends Logging { shuffleId: Int, startPartition: Int, endPartition: Int, - statuses: Array[MapStatus]): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = { + statuses: Array[MapStatus]): Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { assert (statuses != null) - val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long)]] + val splitsByAddress = new HashMap[MapShuffleLocations, ListBuffer[(BlockId, Long)]] for ((status, mapId) <- statuses.iterator.zipWithIndex) { if (status == null) { val errorMessage = s"Missing an output location for shuffle $shuffleId" @@ -883,7 +885,7 @@ private[spark] object MapOutputTracker extends Logging { for (part <- startPartition until endPartition) { val size = status.getSizeForBlock(part) if (size != 0) { - splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) += + splitsByAddress.getOrElseUpdate(status.mapShuffleLocations, ListBuffer()) += ((ShuffleBlockId(shuffleId, mapId, part), size)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 50b4a8f79c9ec..395178c47f686 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -34,13 +34,17 @@ import org.apache.spark.util.Utils * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks. */ private[spark] sealed trait MapStatus { + /** * Locations where this task stored shuffle blocks. * * May be null if the MapOutputTracker is not tracking the location of shuffle blocks, leaving it * up to the implementation of shuffle plugins to do so. */ - def location: MapShuffleLocations + def mapShuffleLocations: MapShuffleLocations + + /** Location where the task was run. */ + def location: BlockManagerId /** * Estimated size for the reduce block, in bytes. @@ -70,11 +74,23 @@ private[spark] object MapStatus { private val DEFAULT_MAP_SHUFFLE_LOCATIONS_ID: Byte = 0 private val NON_DEFAULT_MAP_SHUFFLE_LOCATIONS_ID: Byte = 1 - def apply(loc: MapShuffleLocations, uncompressedSizes: Array[Long]): MapStatus = { + /** + * Visible for testing. + */ + def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = { + apply(loc, DefaultMapShuffleLocations.get(loc), uncompressedSizes) + } + + def apply( + loc: BlockManagerId, + mapShuffleLocs: MapShuffleLocations, + uncompressedSizes: Array[Long]): MapStatus = { if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) { - HighlyCompressedMapStatus(loc, uncompressedSizes) + HighlyCompressedMapStatus( + loc, mapShuffleLocs, uncompressedSizes) } else { - new CompressedMapStatus(loc, uncompressedSizes) + new CompressedMapStatus( + loc, mapShuffleLocs, uncompressedSizes) } } @@ -106,68 +122,87 @@ private[spark] object MapStatus { } } - def writeShuffleLocations(loc: MapShuffleLocations, out: ObjectOutput): Unit = { - if (loc != null) { + def writeLocations( + loc: BlockManagerId, + mapShuffleLocs: MapShuffleLocations, + out: ObjectOutput): Unit = { + if (mapShuffleLocs != null) { out.writeBoolean(true) - loc match { - case DefaultMapShuffleLocations => - out.writeByte(MapStatus.DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) - loc.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.writeExternal(out) - case _ => - out.writeByte(MapStatus.NON_DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) - out.writeObject(loc) + if (mapShuffleLocs.isInstanceOf[DefaultMapShuffleLocations] + && mapShuffleLocs.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId == loc) { + out.writeByte(MapStatus.DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) + } else { + out.writeByte(MapStatus.NON_DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) + out.writeObject(mapShuffleLocs) } } else { out.writeBoolean(false) } + loc.writeExternal(out) } - def readShuffleLocations(in: ObjectInput): MapShuffleLocations = { + def readLocations(in: ObjectInput): (BlockManagerId, MapShuffleLocations) = { if (in.readBoolean()) { val locId = in.readByte() if (locId == MapStatus.DEFAULT_MAP_SHUFFLE_LOCATIONS_ID) { val blockManagerId = BlockManagerId(in) - DefaultMapShuffleLocations.get(blockManagerId) + (blockManagerId, DefaultMapShuffleLocations.get(blockManagerId)) } else { - in.readObject().asInstanceOf[MapShuffleLocations] + val mapShuffleLocations = in.readObject().asInstanceOf[MapShuffleLocations] + val blockManagerId = BlockManagerId(in) + (blockManagerId, mapShuffleLocations) } - } else null + } else { + val blockManagerId = BlockManagerId(in) + (blockManagerId, null) + } } } - /** * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is * represented using a single byte. * - * @param loc location where the task is being executed. + * @param mapShuffleLocs location where the task is being executed. * @param compressedSizes size of the blocks, indexed by reduce partition id. */ private[spark] class CompressedMapStatus( - private[this] var loc: MapShuffleLocations, + private[this] var loc: BlockManagerId, + private[this] var mapShuffleLocs: MapShuffleLocations, private[this] var compressedSizes: Array[Byte]) extends MapStatus with Externalizable { - protected def this() = this(null, null.asInstanceOf[Array[Byte]]) // For deserialization only - - def this(loc: MapShuffleLocations, uncompressedSizes: Array[Long]) { - this(loc, uncompressedSizes.map(MapStatus.compressSize)) + // For deserialization only + protected def this() = this(null, null, null.asInstanceOf[Array[Byte]]) + + def this( + loc: BlockManagerId, + mapShuffleLocations: MapShuffleLocations, + uncompressedSizes: Array[Long]) { + this( + loc, + mapShuffleLocations, + uncompressedSizes.map(MapStatus.compressSize)) } - override def location: MapShuffleLocations = loc + override def location: BlockManagerId = loc + + override def mapShuffleLocations: MapShuffleLocations = mapShuffleLocs override def getSizeForBlock(reduceId: Int): Long = { MapStatus.decompressSize(compressedSizes(reduceId)) } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - MapStatus.writeShuffleLocations(loc, out) + MapStatus.writeLocations(loc, mapShuffleLocs, out) out.writeInt(compressedSizes.length) out.write(compressedSizes) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - loc = MapStatus.readShuffleLocations(in) + val (deserializedLoc, deserializedMapShuffleLocs) = MapStatus.readLocations(in) + loc = deserializedLoc + mapShuffleLocs = deserializedMapShuffleLocs val len = in.readInt() compressedSizes = new Array[Byte](len) in.readFully(compressedSizes) @@ -186,7 +221,8 @@ private[spark] class CompressedMapStatus( * @param hugeBlockSizes sizes of huge blocks by their reduceId. */ private[spark] class HighlyCompressedMapStatus private ( - private[this] var loc: MapShuffleLocations, + private[this] var loc: BlockManagerId, + private[this] var mapShuffleLocs: MapShuffleLocations, private[this] var numNonEmptyBlocks: Int, private[this] var emptyBlocks: RoaringBitmap, private[this] var avgSize: Long, @@ -197,9 +233,11 @@ private[spark] class HighlyCompressedMapStatus private ( require(loc == null || avgSize > 0 || hugeBlockSizes.size > 0 || numNonEmptyBlocks == 0, "Average size can only be zero for map stages that produced no output") - protected def this() = this(null, -1, null, -1, null) // For deserialization only + protected def this() = this(null, null, -1, null, -1, null) // For deserialization only + + override def location: BlockManagerId = loc - override def location: MapShuffleLocations = loc + override def mapShuffleLocations: MapShuffleLocations = mapShuffleLocs override def getSizeForBlock(reduceId: Int): Long = { assert(hugeBlockSizes != null) @@ -214,7 +252,7 @@ private[spark] class HighlyCompressedMapStatus private ( } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { - MapStatus.writeShuffleLocations(loc, out) + MapStatus.writeLocations(loc, mapShuffleLocs, out) emptyBlocks.writeExternal(out) out.writeLong(avgSize) out.writeInt(hugeBlockSizes.size) @@ -225,7 +263,9 @@ private[spark] class HighlyCompressedMapStatus private ( } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - loc = MapStatus.readShuffleLocations(in) + val (deserializedLoc, deserializedMapShuffleLocs) = MapStatus.readLocations(in) + loc = deserializedLoc + mapShuffleLocs = deserializedMapShuffleLocs emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) avgSize = in.readLong() @@ -241,7 +281,10 @@ private[spark] class HighlyCompressedMapStatus private ( } private[spark] object HighlyCompressedMapStatus { - def apply(loc: MapShuffleLocations, uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { + def apply( + loc: BlockManagerId, + mapShuffleLocs: MapShuffleLocations, + uncompressedSizes: Array[Long]): HighlyCompressedMapStatus = { // We must keep track of which blocks are empty so that we don't report a zero-sized // block as being non-empty (or vice-versa) when using the average block size. var i = 0 @@ -281,7 +324,12 @@ private[spark] object HighlyCompressedMapStatus { } emptyBlocks.trim() emptyBlocks.runOptimize() - new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize, - hugeBlockSizes) + new HighlyCompressedMapStatus( + loc, + mapShuffleLocs, + numNonEmptyBlocks, + emptyBlocks, + avgSize, + hugeBlockSizes) } } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 2df133dd2b13a..128cd30bda18c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -20,22 +20,21 @@ package org.apache.spark.serializer import java.io._ import java.nio.ByteBuffer import java.util.Locale + import javax.annotation.Nullable import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import scala.util.control.NonFatal - import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput} import com.esotericsoftware.kryo.pool.{KryoCallback, KryoFactory, KryoPool} -import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} +import com.esotericsoftware.kryo.serializers.{ExternalizableSerializer, JavaSerializer => KryoJavaSerializer} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} import org.roaringbitmap.RoaringBitmap - import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.internal.Logging @@ -152,6 +151,8 @@ class KryoSerializer(conf: SparkConf) kryo.register(classOf[SerializableConfiguration], new KryoJavaSerializer()) kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer()) kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) + kryo.register(classOf[CompressedMapStatus], new ExternalizableSerializer()) + kryo.register(classOf[HighlyCompressedMapStatus], new ExternalizableSerializer()) kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) @@ -485,8 +486,6 @@ private[serializer] object KryoSerializer { private val toRegister: Seq[Class[_]] = Seq( ByteBuffer.allocate(1).getClass, classOf[StorageLevel], - classOf[CompressedMapStatus], - classOf[HighlyCompressedMapStatus], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Boolean]], diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index c5eefc7c5c049..b023d63355a46 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -18,8 +18,9 @@ package org.apache.spark.shuffle import org.apache.spark._ -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{Logging, config} import org.apache.spark.serializer.SerializerManager +import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator} import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter @@ -47,7 +48,13 @@ private[spark] class BlockStoreShuffleReader[K, C]( context, blockManager.shuffleClient, blockManager, - mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), + mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition) + .map { case (loc, blocks) => + require( + loc.isInstanceOf[DefaultMapShuffleLocations], + "Non-default shuffle location types are currently non supported.") + (loc.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId, blocks) + }, serializerManager.wrapStream, // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 9f6620c985625..0063013091f24 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -71,7 +71,9 @@ private[spark] class SortShuffleWriter[K, V, C]( val partitionLengths = sorter.writePartitionedFile(blockId, tmp) shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) mapStatus = MapStatus( - DefaultMapShuffleLocations.get(blockManager.shuffleServerId), partitionLengths) + blockManager.shuffleServerId, + DefaultMapShuffleLocations.get(blockManager.shuffleServerId), + partitionLengths) } finally { if (tmp.exists() && !tmp.delete()) { logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index f6d6a7aa7cadc..b35a5f3198b01 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -17,9 +17,10 @@ package org.apache.spark -import scala.collection.mutable.ArrayBuffer import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.LocalSparkContext._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.config._ @@ -68,8 +69,11 @@ class MapOutputTrackerSuite extends SparkFunSuite { Array(10000L, 1000L))) val statuses = tracker.getMapSizesByExecutorId(10, 0) assert(statuses.toSet === - Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))), - (BlockManagerId("b", "hostB", 1000), ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000)))) + Seq( + (DefaultMapShuffleLocations.get(BlockManagerId("a", "hostA", 1000)), + ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))), + (DefaultMapShuffleLocations.get(BlockManagerId("b", "hostB", 1000)), + ArrayBuffer((ShuffleBlockId(10, 1, 0), size10000)))) .toSet) assert(0 == tracker.getNumCachedSerializedBroadcast) tracker.stop() @@ -149,7 +153,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { BlockManagerId("a", "hostA", 1000), Array(1000L))) slaveTracker.updateEpoch(masterTracker.getEpoch) assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq === - Seq((BlockManagerId("a", "hostA", 1000), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) + Seq( + (DefaultMapShuffleLocations.get(BlockManagerId("a", "hostA", 1000)), + ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) assert(0 == masterTracker.getNumCachedSerializedBroadcast) val masterTrackerEpochBeforeLossOfMapOutput = masterTracker.getEpoch @@ -260,8 +266,10 @@ class MapOutputTrackerSuite extends SparkFunSuite { // being sent. masterTracker.registerShuffle(20, 100) (0 until 100).foreach { i => + val bmId = BlockManagerId("999", "mps", 1000) masterTracker.registerMapOutput(20, i, new CompressedMapStatus( - DefaultMapShuffleLocations.get(BlockManagerId("999", "mps", 1000)), + bmId, + DefaultMapShuffleLocations.get(bmId), Array.fill[Long](4000000)(0))) } val senderAddress = RpcAddress("localhost", 12345) @@ -317,9 +325,9 @@ class MapOutputTrackerSuite extends SparkFunSuite { assert(tracker.containsShuffle(10)) assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq === Seq( - (BlockManagerId("a", "hostA", 1000), + (DefaultMapShuffleLocations.get(BlockManagerId("a", "hostA", 1000)), Seq((ShuffleBlockId(10, 0, 1), size1000), (ShuffleBlockId(10, 0, 3), size10000))), - (BlockManagerId("b", "hostB", 1000), + (DefaultMapShuffleLocations.get(BlockManagerId("b", "hostB", 1000)), Seq((ShuffleBlockId(10, 1, 0), size10000), (ShuffleBlockId(10, 1, 2), size1000))) ) ) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index e17d264cced9f..a0bef6fce6985 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -20,21 +20,22 @@ package org.apache.spark.scheduler import java.util.Properties import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} +import org.scalatest.time.SpanSugar._ import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal -import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} -import org.scalatest.time.SpanSugar._ - import org.apache.spark._ +import org.apache.spark.api.shuffle.MapShuffleLocations import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.config import org.apache.spark.rdd.{DeterministicLevel, RDD} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException} +import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, LongAccumulator, Utils} @@ -701,7 +702,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + HashSet(makeShuffleLocation("hostA"), makeShuffleLocation("hostB"))) complete(taskSets(1), Seq((Success, 42))) assert(results === Map(0 -> 42)) assertDataStructuresEmpty() @@ -727,8 +728,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have the 2nd attempt pass complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length)))) // we can see both result blocks now - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet === - HashSet("hostA", "hostB")) + assert(mapOutputTracker + .getMapSizesByExecutorId(shuffleId, 0) + .map(_._1.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) + .toSet === HashSet("hostA", "hostB")) complete(taskSets(3), Seq((Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) assertDataStructuresEmpty() @@ -770,7 +773,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } else { assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + HashSet(makeShuffleLocation("hostA"), makeShuffleLocation("hostB"))) } } } @@ -1063,8 +1066,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostA", reduceRdd.partitions.length)), (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) // The MapOutputTracker should know about both map output locations. - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet === - HashSet("hostA", "hostB")) + assert(mapOutputTracker + .getMapSizesByExecutorId(shuffleId, 0) + .map(_._1.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) + .toSet === HashSet("hostA", "hostB")) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(makeCompletionEvent( @@ -1193,10 +1198,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostB", 2)))) // The MapOutputTracker should know about both map output locations. - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1.host).toSet === - HashSet("hostA", "hostB")) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 1).map(_._1.host).toSet === - HashSet("hostA", "hostB")) + assert(mapOutputTracker + .getMapSizesByExecutorId(shuffleId, 0) + .map(_._1.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) + .toSet === HashSet("hostA", "hostB")) + assert(mapOutputTracker + .getMapSizesByExecutorId(shuffleId, 1) + .map(_._1.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) + .toSet === HashSet("hostA", "hostB")) // The first result task fails, with a fetch failure for the output from the first mapper. runEvent(makeCompletionEvent( @@ -1387,7 +1396,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi makeMapStatus("hostA", reduceRdd.partitions.size))) assert(shuffleStage.numAvailableOutputs === 2) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) + HashSet(makeShuffleLocation("hostB"), makeShuffleLocation("hostA"))) // finish the next stage normally, which completes the job complete(taskSets(1), Seq((Success, 42), (Success, 43))) @@ -1793,7 +1802,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have hostC complete the resubmitted task complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) + HashSet(makeShuffleLocation("hostC"), makeShuffleLocation("hostB"))) // Make sure that the reduce stage was now submitted. assert(taskSets.size === 3) @@ -2056,7 +2065,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)))) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"))) + HashSet(makeShuffleLocation("hostA"))) // Reducer should run on the same host that map task ran val reduceTaskSet = taskSets(1) @@ -2102,7 +2111,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)))) assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"))) + HashSet(makeShuffleLocation("hostA"))) // Reducer should run where RDD 2 has preferences, even though it also has a shuffle dep val reduceTaskSet = taskSets(1) @@ -2266,7 +2275,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostA", rdd1.partitions.length)), (Success, makeMapStatus("hostB", rdd1.partitions.length)))) assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + HashSet(makeShuffleLocation("hostA"), makeShuffleLocation("hostB"))) assert(listener1.results.size === 1) // When attempting the second stage, show a fetch failure @@ -2282,7 +2291,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(2), Seq( (Success, makeMapStatus("hostC", rdd2.partitions.length)))) assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) + HashSet(makeShuffleLocation("hostC"), makeShuffleLocation("hostB"))) + assert(listener2.results.size === 0) // Second stage listener should still not have a result // Stage 1 should now be running as task set 3; make its first task succeed @@ -2291,7 +2301,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostB", rdd2.partitions.length)), (Success, makeMapStatus("hostD", rdd2.partitions.length)))) assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostD"))) + HashSet(makeShuffleLocation("hostB"), makeShuffleLocation("hostD"))) assert(listener2.results.size === 1) // Finally, the reduce job should be running as task set 4; make it see a fetch failure, @@ -2330,7 +2340,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostA", rdd1.partitions.length)), (Success, makeMapStatus("hostB", rdd1.partitions.length)))) assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === - HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) + HashSet(makeShuffleLocation("hostA"), makeShuffleLocation("hostB"))) assert(listener1.results.size === 1) // When attempting stage1, trigger a fetch failure. @@ -2356,7 +2366,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(2), Seq( (Success, makeMapStatus("hostC", rdd2.partitions.length)))) assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === - Set(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) + Set(makeShuffleLocation("hostC"), makeShuffleLocation("hostB"))) // After stage0 is finished, stage1 will be submitted and found there is no missing // partitions in it. Then listener got triggered. @@ -2908,6 +2918,10 @@ object DAGSchedulerSuite { def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) + + def makeShuffleLocation(host: String): MapShuffleLocations = { + DefaultMapShuffleLocations.get(makeBlockManagerId(host)) + } } object FailThisAttempt { diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 9a10bef411fce..5c5651d81930c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -61,7 +61,11 @@ class MapStatusSuite extends SparkFunSuite { stddev <- Seq(0.0, 0.01, 0.5, 1.0) ) { val sizes = Array.fill[Long](numSizes)(abs(round(Random.nextGaussian() * stddev)) + mean) - val status = MapStatus(DefaultMapShuffleLocations.get(BlockManagerId("a", "b", 10)), sizes) + val bmId = BlockManagerId("a", "b", 10) + val status = MapStatus( + bmId, + DefaultMapShuffleLocations.get(bmId), + sizes) val status1 = compressAndDecompressMapStatus(status) for (i <- 0 until numSizes) { if (sizes(i) != 0) { @@ -75,7 +79,7 @@ class MapStatusSuite extends SparkFunSuite { test("large tasks should use " + classOf[HighlyCompressedMapStatus].getName) { val sizes = Array.fill[Long](2001)(150L) - val status = MapStatus(null, sizes) + val status = MapStatus(null, null, sizes) assert(status.isInstanceOf[HighlyCompressedMapStatus]) assert(status.getSizeForBlock(10) === 150L) assert(status.getSizeForBlock(50) === 150L) @@ -86,11 +90,13 @@ class MapStatusSuite extends SparkFunSuite { test("HighlyCompressedMapStatus: estimated size should be the average non-empty block size") { val sizes = Array.tabulate[Long](3000) { i => i.toLong } val avg = sizes.sum / sizes.count(_ != 0) - val loc = DefaultMapShuffleLocations.get(BlockManagerId("a", "b", 10)) - val status = MapStatus(loc, sizes) + val bmId = BlockManagerId("a", "b", 10) + val loc = DefaultMapShuffleLocations.get(bmId) + val status = MapStatus(bmId, loc, sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) - assert(status1.location == loc) + assert(status1.location == loc.getBlockManagerId) + assert(status1.mapShuffleLocations == loc) for (i <- 0 until 3000) { val estimate = status1.getSizeForBlock(i) if (sizes(i) > 0) { @@ -108,11 +114,13 @@ class MapStatusSuite extends SparkFunSuite { val sizes = (0L to 3000L).toArray val smallBlockSizes = sizes.filter(n => n > 0 && n < threshold) val avg = smallBlockSizes.sum / smallBlockSizes.length - val loc = DefaultMapShuffleLocations.get(BlockManagerId("a", "b", 10)) - val status = MapStatus(loc, sizes) + val bmId = BlockManagerId("a", "b", 10) + val loc = DefaultMapShuffleLocations.get(bmId) + val status = MapStatus(bmId, loc, sizes) val status1 = compressAndDecompressMapStatus(status) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) - assert(status1.location == loc) + assert(status1.location === bmId) + assert(status1.mapShuffleLocations === loc) for (i <- 0 until threshold) { val estimate = status1.getSizeForBlock(i) if (sizes(i) > 0) { @@ -165,9 +173,8 @@ class MapStatusSuite extends SparkFunSuite { SparkEnv.set(env) // Value of element in sizes is equal to the corresponding index. val sizes = (0L to 2000L).toArray - val status1 = MapStatus( - DefaultMapShuffleLocations.get( - BlockManagerId("exec-0", "host-0", 100)), sizes) + val bmId = BlockManagerId("exec-0", "host-0", 100) + val status1 = MapStatus(bmId, DefaultMapShuffleLocations.get(bmId), sizes) val arrayStream = new ByteArrayOutputStream(102400) val objectOutputStream = new ObjectOutputStream(arrayStream) assert(status1.isInstanceOf[HighlyCompressedMapStatus]) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 16eec7e0bea1f..ac9d23d23d4e9 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -21,23 +21,23 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileInputStream, Fi import java.nio.ByteBuffer import java.util.concurrent.Executors +import com.esotericsoftware.kryo.{Kryo, KryoException} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.roaringbitmap.RoaringBitmap import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag -import com.esotericsoftware.kryo.{Kryo, KryoException} -import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import org.roaringbitmap.RoaringBitmap - import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Kryo._ import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ +import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.ThreadUtils class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") @@ -350,8 +350,10 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { val ser = new KryoSerializer(conf).newInstance() val denseBlockSizes = new Array[Long](5000) val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L) + val bmId = BlockManagerId("exec-1", "host", 1234) Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes => - ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes)) + ser.serialize(HighlyCompressedMapStatus( + bmId, DefaultMapShuffleLocations.get(bmId), blockSizes)) } } diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 6d2ef17a7a790..b6dc69c9c5926 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -21,11 +21,11 @@ import java.io.{ByteArrayOutputStream, InputStream} import java.nio.ByteBuffer import org.mockito.Mockito.{mock, when} - import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.serializer.{JavaSerializer, SerializerManager} +import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.{BlockManager, BlockManagerId, ShuffleBlockId} /** @@ -109,7 +109,9 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext val shuffleBlockId = ShuffleBlockId(shuffleId, mapId, reduceId) (shuffleBlockId, byteOutputStream.size().toLong) } - Seq((localBlockManagerId, shuffleBlockIdsAndSizes)).toIterator + Seq( + (DefaultMapShuffleLocations.get(localBlockManagerId), shuffleBlockIdsAndSizes)) + .toIterator } // Create a mocked shuffle handle to pass into HashShuffleReader. From 4f2f752f1a586d07c0cf90d9bcd77e7c9120f5fb Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 15 Mar 2019 15:06:40 -0700 Subject: [PATCH 03/12] Fix style errors --- .../org/apache/spark/serializer/KryoSerializer.scala | 10 +++++----- .../apache/spark/shuffle/BlockStoreShuffleReader.scala | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 128cd30bda18c..4c702e0389c44 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -20,13 +20,8 @@ package org.apache.spark.serializer import java.io._ import java.nio.ByteBuffer import java.util.Locale - import javax.annotation.Nullable -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.reflect.ClassTag -import scala.util.control.NonFatal import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput} @@ -35,6 +30,11 @@ import com.esotericsoftware.kryo.serializers.{ExternalizableSerializer, JavaSeri import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} import org.roaringbitmap.RoaringBitmap +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag +import scala.util.control.NonFatal + import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.internal.Logging diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index b023d63355a46..3dc0d4dae752e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -18,7 +18,7 @@ package org.apache.spark.shuffle import org.apache.spark._ -import org.apache.spark.internal.{Logging, config} +import org.apache.spark.internal.{config, Logging} import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator} From 35c1bee56b1e1c732d659bcd187f1ef48c05e4c7 Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 15 Mar 2019 15:20:32 -0700 Subject: [PATCH 04/12] More style --- .../org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index b6dc69c9c5926..ba1a2dc92e23c 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -21,6 +21,7 @@ import java.io.{ByteArrayOutputStream, InputStream} import java.nio.ByteBuffer import org.mockito.Mockito.{mock, when} + import org.apache.spark._ import org.apache.spark.internal.config import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} From ad0b7a13370b06de9090e275e5193318189cdf8c Mon Sep 17 00:00:00 2001 From: mcheah Date: Fri, 15 Mar 2019 15:53:12 -0700 Subject: [PATCH 05/12] More style things --- .../api/shuffle/MapShuffleLocations.java | 8 +- .../sort/DefaultMapShuffleLocations.java | 74 +++++++++---------- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java b/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java index cb31e57d95566..b0aed4d08d387 100644 --- a/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java +++ b/core/src/main/java/org/apache/spark/api/shuffle/MapShuffleLocations.java @@ -32,8 +32,8 @@ @Experimental public interface MapShuffleLocations extends Serializable { - /** - * Get the location for a given shuffle block written by this map task. - */ - ShuffleLocation getLocationForBlock(int reduceId); + /** + * Get the location for a given shuffle block written by this map task. + */ + ShuffleLocation getLocationForBlock(int reduceId); } diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java b/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java index 4e22f13e4a0a5..0426c7ac64181 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java @@ -29,48 +29,48 @@ public class DefaultMapShuffleLocations implements MapShuffleLocations, ShuffleLocation { - /** - * We borrow the cache size from the BlockManagerId's cache - around 1MB, which should be - * feasible. - */ - private static final LoadingCache - DEFAULT_SHUFFLE_LOCATIONS_CACHE = - CacheBuilder.newBuilder() - .maximumSize(10000) - .build(new CacheLoader() { - @Override - public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) { - return new DefaultMapShuffleLocations(blockManagerId); - } - }); + /** + * We borrow the cache size from the BlockManagerId's cache - around 1MB, which should be + * feasible. + */ + private static final LoadingCache + DEFAULT_SHUFFLE_LOCATIONS_CACHE = + CacheBuilder.newBuilder() + .maximumSize(10000) + .build(new CacheLoader() { + @Override + public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) { + return new DefaultMapShuffleLocations(blockManagerId); + } + }); - private final BlockManagerId location; + private final BlockManagerId location; - public DefaultMapShuffleLocations(BlockManagerId blockManagerId) { - this.location = blockManagerId; - } + public DefaultMapShuffleLocations(BlockManagerId blockManagerId) { + this.location = blockManagerId; + } - public static DefaultMapShuffleLocations get(BlockManagerId blockManagerId) { - return DEFAULT_SHUFFLE_LOCATIONS_CACHE.getUnchecked(blockManagerId); - } + public static DefaultMapShuffleLocations get(BlockManagerId blockManagerId) { + return DEFAULT_SHUFFLE_LOCATIONS_CACHE.getUnchecked(blockManagerId); + } - @Override - public ShuffleLocation getLocationForBlock(int reduceId) { - return this; - } + @Override + public ShuffleLocation getLocationForBlock(int reduceId) { + return this; + } - public BlockManagerId getBlockManagerId() { - return location; - } + public BlockManagerId getBlockManagerId() { + return location; + } - @Override - public boolean equals(Object other) { - return other instanceof DefaultMapShuffleLocations - && Objects.equals(((DefaultMapShuffleLocations) other).location, location); - } + @Override + public boolean equals(Object other) { + return other instanceof DefaultMapShuffleLocations + && Objects.equals(((DefaultMapShuffleLocations) other).location, location); + } - @Override - public int hashCode() { - return Objects.hashCode(location); - } + @Override + public int hashCode() { + return Objects.hashCode(location); + } } From 95937d1d368beaae03fbce51d9308745a2ed5241 Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 20 Mar 2019 16:45:59 -0700 Subject: [PATCH 06/12] Address comments --- .../org/apache/spark/MapOutputTracker.scala | 10 ++--- .../apache/spark/scheduler/MapStatus.scala | 7 +++- .../spark/serializer/KryoSerializer.scala | 9 ++-- .../shuffle/BlockStoreShuffleReader.scala | 2 +- .../apache/spark/MapOutputTrackerSuite.scala | 21 +++++----- .../scala/org/apache/spark/ShuffleSuite.scala | 6 +-- .../spark/scheduler/DAGSchedulerSuite.scala | 41 ++++++++++--------- .../spark/scheduler/MapStatusSuite.scala | 3 +- .../scheduler/SchedulerIntegrationSuite.scala | 2 +- .../serializer/KryoSerializerSuite.scala | 7 ++-- .../BlockStoreShuffleReaderSuite.scala | 2 +- 11 files changed, 59 insertions(+), 51 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 05496690a3d8c..2fad516d9b51c 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -282,9 +282,9 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging } // For testing - def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int) + def getMapSizesByShuffleLocation(shuffleId: Int, reduceId: Int) : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { - getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1) + getMapSizesByShuffleLocation(shuffleId, reduceId, reduceId + 1) } /** @@ -296,7 +296,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * and the second item is a sequence of (shuffle block id, shuffle block size) tuples * describing the shuffle blocks that are stored at that block manager. */ - def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) + def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int) : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] /** @@ -646,7 +646,7 @@ private[spark] class MapOutputTrackerMaster( // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. // This method is only called in local-mode. - def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) + def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int) : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { @@ -683,7 +683,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr private val fetching = new HashSet[Int] // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. - override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int) + override def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int) : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 395178c47f686..a61f9bd14ef2f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -19,9 +19,10 @@ package org.apache.spark.scheduler import java.io.{Externalizable, ObjectInput, ObjectOutput} -import org.roaringbitmap.RoaringBitmap import scala.collection.mutable +import org.roaringbitmap.RoaringBitmap + import org.apache.spark.SparkEnv import org.apache.spark.api.shuffle.MapShuffleLocations import org.apache.spark.internal.config @@ -163,7 +164,8 @@ private[spark] object MapStatus { * A [[MapStatus]] implementation that tracks the size of each block. Size for each block is * represented using a single byte. * - * @param mapShuffleLocs location where the task is being executed. + * @param loc Location were the task is being executed. + * @param mapShuffleLocs locations where the task stored its shuffle blocks - may be null. * @param compressedSizes size of the blocks, indexed by reduce partition id. */ private[spark] class CompressedMapStatus( @@ -215,6 +217,7 @@ private[spark] class CompressedMapStatus( * plus a bitmap for tracking which blocks are empty. * * @param loc location where the task is being executed + * @param mapShuffleLocs location where the task stored shuffle blocks - may be null * @param numNonEmptyBlocks the number of non-empty blocks * @param emptyBlocks a bitmap tracking which blocks are empty * @param avgSize average size of the non-empty and non-huge blocks diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 4c702e0389c44..ba8c92518f019 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -22,6 +22,11 @@ import java.nio.ByteBuffer import java.util.Locale import javax.annotation.Nullable +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag +import scala.util.control.NonFatal + import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput} @@ -30,10 +35,6 @@ import com.esotericsoftware.kryo.serializers.{ExternalizableSerializer, JavaSeri import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.apache.avro.generic.{GenericData, GenericRecord} import org.roaringbitmap.RoaringBitmap -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer -import scala.reflect.ClassTag -import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 3dc0d4dae752e..225e91ad6ec7e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -48,7 +48,7 @@ private[spark] class BlockStoreShuffleReader[K, C]( context, blockManager.shuffleClient, blockManager, - mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition) + mapOutputTracker.getMapSizesByShuffleLocation(handle.shuffleId, startPartition, endPartition) .map { case (loc, blocks) => require( loc.isInstanceOf[DefaultMapShuffleLocations], diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index b35a5f3198b01..0a77c4f6d5838 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -17,9 +17,10 @@ package org.apache.spark +import scala.collection.mutable.ArrayBuffer + import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ -import scala.collection.mutable.ArrayBuffer import org.apache.spark.LocalSparkContext._ import org.apache.spark.broadcast.BroadcastManager @@ -67,7 +68,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { Array(1000L, 10000L))) tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(10000L, 1000L))) - val statuses = tracker.getMapSizesByExecutorId(10, 0) + val statuses = tracker.getMapSizesByShuffleLocation(10, 0) assert(statuses.toSet === Seq( (DefaultMapShuffleLocations.get(BlockManagerId("a", "hostA", 1000)), @@ -93,11 +94,11 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(compressedSize10000, compressedSize1000))) assert(tracker.containsShuffle(10)) - assert(tracker.getMapSizesByExecutorId(10, 0).nonEmpty) + assert(tracker.getMapSizesByShuffleLocation(10, 0).nonEmpty) assert(0 == tracker.getNumCachedSerializedBroadcast) tracker.unregisterShuffle(10) assert(!tracker.containsShuffle(10)) - assert(tracker.getMapSizesByExecutorId(10, 0).isEmpty) + assert(tracker.getMapSizesByShuffleLocation(10, 0).isEmpty) tracker.stop() rpcEnv.shutdown() @@ -124,7 +125,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { // The remaining reduce task might try to grab the output despite the shuffle failure; // this should cause it to fail, and the scheduler will ignore the failure due to the // stage already being aborted. - intercept[FetchFailedException] { tracker.getMapSizesByExecutorId(10, 1) } + intercept[FetchFailedException] { tracker.getMapSizesByShuffleLocation(10, 1) } tracker.stop() rpcEnv.shutdown() @@ -146,13 +147,13 @@ class MapOutputTrackerSuite extends SparkFunSuite { masterTracker.registerShuffle(10, 1) slaveTracker.updateEpoch(masterTracker.getEpoch) // This is expected to fail because no outputs have been registered for the shuffle. - intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) } + intercept[FetchFailedException] { slaveTracker.getMapSizesByShuffleLocation(10, 0) } val size1000 = MapStatus.decompressSize(MapStatus.compressSize(1000L)) masterTracker.registerMapOutput(10, 0, MapStatus( BlockManagerId("a", "hostA", 1000), Array(1000L))) slaveTracker.updateEpoch(masterTracker.getEpoch) - assert(slaveTracker.getMapSizesByExecutorId(10, 0).toSeq === + assert(slaveTracker.getMapSizesByShuffleLocation(10, 0).toSeq === Seq( (DefaultMapShuffleLocations.get(BlockManagerId("a", "hostA", 1000)), ArrayBuffer((ShuffleBlockId(10, 0, 0), size1000))))) @@ -162,10 +163,10 @@ class MapOutputTrackerSuite extends SparkFunSuite { masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000)) assert(masterTracker.getEpoch > masterTrackerEpochBeforeLossOfMapOutput) slaveTracker.updateEpoch(masterTracker.getEpoch) - intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) } + intercept[FetchFailedException] { slaveTracker.getMapSizesByShuffleLocation(10, 0) } // failure should be cached - intercept[FetchFailedException] { slaveTracker.getMapSizesByExecutorId(10, 0) } + intercept[FetchFailedException] { slaveTracker.getMapSizesByShuffleLocation(10, 0) } assert(0 == masterTracker.getNumCachedSerializedBroadcast) masterTracker.stop() @@ -323,7 +324,7 @@ class MapOutputTrackerSuite extends SparkFunSuite { tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("b", "hostB", 1000), Array(size10000, size0, size1000, size0))) assert(tracker.containsShuffle(10)) - assert(tracker.getMapSizesByExecutorId(10, 0, 4).toSeq === + assert(tracker.getMapSizesByShuffleLocation(10, 0, 4).toSeq === Seq( (DefaultMapShuffleLocations.get(BlockManagerId("a", "hostA", 1000)), Seq((ShuffleBlockId(10, 0, 1), size1000), (ShuffleBlockId(10, 0, 3), size10000))), diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 8b1084a8edc76..624ac62a6d563 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -73,7 +73,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC // All blocks must have non-zero size (0 until NUM_BLOCKS).foreach { id => - val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, id) + val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, id) assert(statuses.forall(_._2.forall(blockIdSizePair => blockIdSizePair._2 > 0))) } } @@ -112,7 +112,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(c.count === 4) val blockSizes = (0 until NUM_BLOCKS).flatMap { id => - val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, id) + val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, id) statuses.flatMap(_._2.map(_._2)) } val nonEmptyBlocks = blockSizes.filter(x => x > 0) @@ -137,7 +137,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC assert(c.count === 4) val blockSizes = (0 until NUM_BLOCKS).flatMap { id => - val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(shuffleId, id) + val statuses = SparkEnv.get.mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, id) statuses.flatMap(_._2.map(_._2)) } val nonEmptyBlocks = blockSizes.filter(x => x > 0) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index a0bef6fce6985..14b93957734e4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -20,13 +20,14 @@ package org.apache.spark.scheduler import java.util.Properties import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} -import org.scalatest.time.SpanSugar._ import scala.annotation.meta.param import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} import scala.language.reflectiveCalls import scala.util.control.NonFatal +import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} +import org.scalatest.time.SpanSugar._ + import org.apache.spark._ import org.apache.spark.api.shuffle.MapShuffleLocations import org.apache.spark.broadcast.BroadcastManager @@ -701,7 +702,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostA"), makeShuffleLocation("hostB"))) complete(taskSets(1), Seq((Success, 42))) assert(results === Map(0 -> 42)) @@ -729,7 +730,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.length)))) // we can see both result blocks now assert(mapOutputTracker - .getMapSizesByExecutorId(shuffleId, 0) + .getMapSizesByShuffleLocation(shuffleId, 0) .map(_._1.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) .toSet === HashSet("hostA", "hostB")) complete(taskSets(3), Seq((Success, 43))) @@ -769,10 +770,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi runEvent(ExecutorLost("exec-hostA", event)) if (expectFileLoss) { intercept[MetadataFetchFailedException] { - mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0) + mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0) } } else { - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostA"), makeShuffleLocation("hostB"))) } } @@ -1067,7 +1068,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostB", reduceRdd.partitions.length)))) // The MapOutputTracker should know about both map output locations. assert(mapOutputTracker - .getMapSizesByExecutorId(shuffleId, 0) + .getMapSizesByShuffleLocation(shuffleId, 0) .map(_._1.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) .toSet === HashSet("hostA", "hostB")) @@ -1199,11 +1200,11 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi (Success, makeMapStatus("hostB", 2)))) // The MapOutputTracker should know about both map output locations. assert(mapOutputTracker - .getMapSizesByExecutorId(shuffleId, 0) + .getMapSizesByShuffleLocation(shuffleId, 0) .map(_._1.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) .toSet === HashSet("hostA", "hostB")) assert(mapOutputTracker - .getMapSizesByExecutorId(shuffleId, 1) + .getMapSizesByShuffleLocation(shuffleId, 1) .map(_._1.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId.host) .toSet === HashSet("hostA", "hostB")) @@ -1395,7 +1396,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi Success, makeMapStatus("hostA", reduceRdd.partitions.size))) assert(shuffleStage.numAvailableOutputs === 2) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostB"), makeShuffleLocation("hostA"))) // finish the next stage normally, which completes the job @@ -1550,7 +1551,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi reduceIdx <- reduceIdxs } { // this would throw an exception if the map status hadn't been registered - val statuses = mapOutputTracker.getMapSizesByExecutorId(stage, reduceIdx) + val statuses = mapOutputTracker.getMapSizesByShuffleLocation(stage, reduceIdx) // really we should have already thrown an exception rather than fail either of these // asserts, but just to be extra defensive let's double check the statuses are OK assert(statuses != null) @@ -1602,7 +1603,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // check that we have all the map output for stage 0 (0 until reduceRdd.partitions.length).foreach { reduceIdx => - val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx) + val statuses = mapOutputTracker.getMapSizesByShuffleLocation(0, reduceIdx) // really we should have already thrown an exception rather than fail either of these // asserts, but just to be extra defensive let's double check the statuses are OK assert(statuses != null) @@ -1801,7 +1802,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // have hostC complete the resubmitted task complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostC"), makeShuffleLocation("hostB"))) // Make sure that the reduce stage was now submitted. @@ -2064,7 +2065,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(reduceRdd, Array(0)) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)))) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostA"))) // Reducer should run on the same host that map task ran @@ -2110,7 +2111,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi submit(reduceRdd, Array(0)) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)))) - assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostA"))) // Reducer should run where RDD 2 has preferences, even though it also has a shuffle dep @@ -2274,7 +2275,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", rdd1.partitions.length)), (Success, makeMapStatus("hostB", rdd1.partitions.length)))) - assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(dep1.shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostA"), makeShuffleLocation("hostB"))) assert(listener1.results.size === 1) @@ -2290,7 +2291,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(taskSets(2).stageId === 0) complete(taskSets(2), Seq( (Success, makeMapStatus("hostC", rdd2.partitions.length)))) - assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(dep1.shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostC"), makeShuffleLocation("hostB"))) assert(listener2.results.size === 0) // Second stage listener should still not have a result @@ -2300,7 +2301,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(3), Seq( (Success, makeMapStatus("hostB", rdd2.partitions.length)), (Success, makeMapStatus("hostD", rdd2.partitions.length)))) - assert(mapOutputTracker.getMapSizesByExecutorId(dep2.shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(dep2.shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostB"), makeShuffleLocation("hostD"))) assert(listener2.results.size === 1) @@ -2339,7 +2340,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", rdd1.partitions.length)), (Success, makeMapStatus("hostB", rdd1.partitions.length)))) - assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(dep1.shuffleId, 0).map(_._1).toSet === HashSet(makeShuffleLocation("hostA"), makeShuffleLocation("hostB"))) assert(listener1.results.size === 1) @@ -2365,7 +2366,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(taskSets(2).stageId === 0) complete(taskSets(2), Seq( (Success, makeMapStatus("hostC", rdd2.partitions.length)))) - assert(mapOutputTracker.getMapSizesByExecutorId(dep1.shuffleId, 0).map(_._1).toSet === + assert(mapOutputTracker.getMapSizesByShuffleLocation(dep1.shuffleId, 0).map(_._1).toSet === Set(makeShuffleLocation("hostC"), makeShuffleLocation("hostB"))) // After stage0 is finished, stage1 will be submitted and found there is no missing diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala index 5c5651d81930c..3c786c0927bc6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala @@ -19,9 +19,10 @@ package org.apache.spark.scheduler import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream} +import scala.util.Random + import org.mockito.Mockito.mock import org.roaringbitmap.RoaringBitmap -import scala.util.Random import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite} import org.apache.spark.LocalSparkContext._ diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index aa6db8d0423a3..d273eedb8385e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -192,7 +192,7 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa shuffleId <- shuffleIds reduceIdx <- (0 until nParts) } { - val statuses = taskScheduler.mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceIdx) + val statuses = taskScheduler.mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, reduceIdx) // really we should have already thrown an exception rather than fail either of these // asserts, but just to be extra defensive let's double check the statuses are OK assert(statuses != null) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index ac9d23d23d4e9..c523d0cb9ce80 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -21,15 +21,16 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, FileInputStream, Fi import java.nio.ByteBuffer import java.util.concurrent.Executors -import com.esotericsoftware.kryo.{Kryo, KryoException} -import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} -import org.roaringbitmap.RoaringBitmap import scala.collection.JavaConverters._ import scala.collection.mutable import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.reflect.ClassTag +import com.esotericsoftware.kryo.{Kryo, KryoException} +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.roaringbitmap.RoaringBitmap + import org.apache.spark.{SharedSparkContext, SparkConf, SparkFunSuite} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Kryo._ diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index ba1a2dc92e23c..56828a1b93da2 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -103,7 +103,7 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // Make a mocked MapOutputTracker for the shuffle reader to use to determine what // shuffle data to read. val mapOutputTracker = mock(classOf[MapOutputTracker]) - when(mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)).thenReturn { + when(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, reduceId, reduceId + 1)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => From 265e9daa579e528c4f0afd139e778f6422f48f5f Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 3 Apr 2019 16:18:19 -0700 Subject: [PATCH 07/12] Fix build, address comments --- .../shuffle/sort/DefaultMapShuffleLocations.java | 2 +- .../spark/shuffle/BlockStoreShuffleReader.scala | 14 ++++++++------ .../org/apache/spark/storage/BlockManagerId.scala | 4 +++- .../scheduler/SchedulerIntegrationSuite.scala | 3 ++- .../shuffle/BlockStoreShuffleReaderSuite.scala | 3 ++- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java b/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java index 0426c7ac64181..ffd97c0f26605 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/DefaultMapShuffleLocations.java @@ -36,7 +36,7 @@ public class DefaultMapShuffleLocations implements MapShuffleLocations, ShuffleL private static final LoadingCache DEFAULT_SHUFFLE_LOCATIONS_CACHE = CacheBuilder.newBuilder() - .maximumSize(10000) + .maximumSize(BlockManagerId.blockManagerIdCacheSize()) .build(new CacheLoader() { @Override public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) { diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 225e91ad6ec7e..982eafad96f85 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -18,10 +18,11 @@ package org.apache.spark.shuffle import org.apache.spark._ + import org.apache.spark.internal.{config, Logging} import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations -import org.apache.spark.storage.{BlockManager, ShuffleBlockFetcherIterator} +import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, ShuffleBlockFetcherIterator} import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter @@ -49,11 +50,12 @@ private[spark] class BlockStoreShuffleReader[K, C]( blockManager.shuffleClient, blockManager, mapOutputTracker.getMapSizesByShuffleLocation(handle.shuffleId, startPartition, endPartition) - .map { case (loc, blocks) => - require( - loc.isInstanceOf[DefaultMapShuffleLocations], - "Non-default shuffle location types are currently non supported.") - (loc.asInstanceOf[DefaultMapShuffleLocations].getBlockManagerId, blocks) + .map { + case (loc: DefaultMapShuffleLocations, blocks: Seq[(BlockId, Long)]) => + (loc.getBlockManagerId, blocks) + case _ => + throw new UnsupportedOperationException("Not allowed to using non-default map shuffle" + + " locations yet.") }, serializerManager.wrapStream, // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index d4a59c33b974c..d72bd6f9af6bc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -133,12 +133,14 @@ private[spark] object BlockManagerId { getCachedBlockManagerId(obj) } + val blockManagerIdCacheSize = 10000 + /** * The max cache size is hardcoded to 10000, since the size of a BlockManagerId * object is about 48B, the total memory cost should be below 1MB which is feasible. */ val blockManagerIdCache = CacheBuilder.newBuilder() - .maximumSize(10000) + .maximumSize(blockManagerIdCacheSize) .build(new CacheLoader[BlockManagerId, BlockManagerId]() { override def load(id: BlockManagerId) = id }) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index d273eedb8385e..83305a96e6794 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -192,7 +192,8 @@ abstract class SchedulerIntegrationSuite[T <: MockBackend: ClassTag] extends Spa shuffleId <- shuffleIds reduceIdx <- (0 until nParts) } { - val statuses = taskScheduler.mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, reduceIdx) + val statuses = taskScheduler.mapOutputTracker.getMapSizesByShuffleLocation( + shuffleId, reduceIdx) // really we should have already thrown an exception rather than fail either of these // asserts, but just to be extra defensive let's double check the statuses are OK assert(statuses != null) diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 56828a1b93da2..b3073addb7ccc 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -103,7 +103,8 @@ class BlockStoreShuffleReaderSuite extends SparkFunSuite with LocalSparkContext // Make a mocked MapOutputTracker for the shuffle reader to use to determine what // shuffle data to read. val mapOutputTracker = mock(classOf[MapOutputTracker]) - when(mapOutputTracker.getMapSizesByShuffleLocation(shuffleId, reduceId, reduceId + 1)).thenReturn { + when(mapOutputTracker.getMapSizesByShuffleLocation( + shuffleId, reduceId, reduceId + 1)).thenReturn { // Test a scenario where all data is local, to avoid creating a bunch of additional mocks // for the code to read data over the network. val shuffleBlockIdsAndSizes = (0 until numMaps).map { mapId => From b2ee17c123ff05ec2c1a22c5cb9eed3f2072457c Mon Sep 17 00:00:00 2001 From: mcheah Date: Wed, 3 Apr 2019 16:24:04 -0700 Subject: [PATCH 08/12] Fix style --- .../org/apache/spark/shuffle/BlockStoreShuffleReader.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 982eafad96f85..d6f63e71f113c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -18,11 +18,10 @@ package org.apache.spark.shuffle import org.apache.spark._ - import org.apache.spark.internal.{config, Logging} import org.apache.spark.serializer.SerializerManager import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations -import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, ShuffleBlockFetcherIterator} +import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockFetcherIterator} import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter From 44423b8fe9036a7e2cc35f3a9e60b425ed0767c6 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 8 Apr 2019 12:21:09 -0700 Subject: [PATCH 09/12] Get by shuffle location instead of mapshufflelocations --- .../org/apache/spark/MapOutputTracker.scala | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 2fad516d9b51c..74975019e7480 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -28,7 +28,7 @@ import scala.concurrent.duration.Duration import scala.reflect.ClassTag import scala.util.control.NonFatal -import org.apache.spark.api.shuffle.MapShuffleLocations +import org.apache.spark.api.shuffle.{MapShuffleLocations, ShuffleLocation} import org.apache.spark.broadcast.{Broadcast, BroadcastManager} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -283,7 +283,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging // For testing def getMapSizesByShuffleLocation(shuffleId: Int, reduceId: Int) - : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { + : Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] = { getMapSizesByShuffleLocation(shuffleId, reduceId, reduceId + 1) } @@ -297,7 +297,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging * describing the shuffle blocks that are stored at that block manager. */ def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int) - : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] + : Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] /** * Deletes map output status information for the specified shuffle stage. @@ -647,7 +647,7 @@ private[spark] class MapOutputTrackerMaster( // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. // This method is only called in local-mode. def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int) - : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { + : Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { case Some (shuffleStatus) => @@ -684,7 +684,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr // Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result. override def getMapSizesByShuffleLocation(shuffleId: Int, startPartition: Int, endPartition: Int) - : Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { + : Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] = { logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId) try { @@ -873,9 +873,9 @@ private[spark] object MapOutputTracker extends Logging { shuffleId: Int, startPartition: Int, endPartition: Int, - statuses: Array[MapStatus]): Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = { + statuses: Array[MapStatus]): Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] = { assert (statuses != null) - val splitsByAddress = new HashMap[MapShuffleLocations, ListBuffer[(BlockId, Long)]] + val splitsByAddress = new HashMap[ShuffleLocation, ListBuffer[(BlockId, Long)]] for ((status, mapId) <- statuses.iterator.zipWithIndex) { if (status == null) { val errorMessage = s"Missing an output location for shuffle $shuffleId" @@ -885,7 +885,8 @@ private[spark] object MapOutputTracker extends Logging { for (part <- startPartition until endPartition) { val size = status.getSizeForBlock(part) if (size != 0) { - splitsByAddress.getOrElseUpdate(status.mapShuffleLocations, ListBuffer()) += + val shuffleLoc = status.mapShuffleLocations.getLocationForBlock(part) + splitsByAddress.getOrElseUpdate(shuffleLoc, ListBuffer()) += ((ShuffleBlockId(shuffleId, mapId, part), size)) } } From e75ce8812cc66c8c4a65c066fc04b820897751b0 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 18 Apr 2019 18:05:38 -0700 Subject: [PATCH 10/12] Resolve conflicts --- .../io/DefaultShuffleMapOutputWriter.java | 8 +---- .../shuffle/sort/SortShuffleWriter.scala | 36 +++---------------- ...ypassMergeSortShuffleWriterBenchmark.scala | 12 ------- 3 files changed, 5 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java index 80cedbdcf908d..7eb0d56776de9 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java @@ -99,15 +99,9 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException { @Override public Optional commitAllPartitions() throws IOException { cleanUp(); -<<<<<<< HEAD - blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile); - return Optional.of(DefaultMapShuffleLocations.get(shuffleServerId)); -||||||| merged common ancestors - blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, outputTempFile); -======= File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null; blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp); ->>>>>>> palantir/spark-25299 + return Optional.of(DefaultMapShuffleLocations.get(shuffleServerId)); } @Override diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 7d4ce4f19f0b6..6ce309aa84c21 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -64,42 +64,14 @@ private[spark] class SortShuffleWriter[K, V, C]( // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). -<<<<<<< HEAD - val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) - val tmp = Utils.tempFileWith(output) - try { - val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) - val partitionLengths = sorter.writePartitionedFile(blockId, tmp) - shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) - mapStatus = MapStatus( - blockManager.shuffleServerId, - DefaultMapShuffleLocations.get(blockManager.shuffleServerId), - partitionLengths) - } finally { - if (tmp.exists() && !tmp.delete()) { - logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") - } - } -||||||| merged common ancestors - val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId) - val tmp = Utils.tempFileWith(output) - try { - val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) - val partitionLengths = sorter.writePartitionedFile(blockId, tmp) - shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp) - mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) - } finally { - if (tmp.exists() && !tmp.delete()) { - logError(s"Error while deleting temp file ${tmp.getAbsolutePath}") - } - } -======= val mapOutputWriter = writeSupport.createMapOutputWriter( dep.shuffleId, mapId, dep.partitioner.numPartitions) val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter) mapOutputWriter.commitAllPartitions() - mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) ->>>>>>> palantir/spark-25299 + mapStatus = MapStatus( + blockManager.shuffleServerId, + DefaultMapShuffleLocations.get(blockManager.shuffleServerId), + partitionLengths) } /** Close this writer, passing along whether the map completed */ diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala index 2ced7b4c63fcf..0b3394e88d9f1 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterBenchmark.scala @@ -19,14 +19,8 @@ package org.apache.spark.shuffle.sort import org.apache.spark.SparkConf import org.apache.spark.benchmark.Benchmark -<<<<<<< HEAD import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport import org.apache.spark.storage.BlockManagerId -||||||| merged common ancestors -import org.apache.spark.shuffle.sort.io.{DefaultShuffleWriteSupport} -======= -import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport ->>>>>>> palantir/spark-25299 /** * Benchmark to measure performance for aggregate primitives. @@ -53,16 +47,10 @@ object BypassMergeSortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase def getWriter(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = { val conf = new SparkConf(loadDefaults = false) -<<<<<<< HEAD val shuffleWriteSupport = new DefaultShuffleWriteSupport( conf, blockResolver, BlockManagerId("0", "localhost", 7090)) -||||||| merged common ancestors - val shuffleWriteSupport = new DefaultShuffleWriteSupport(conf, blockResolver) -======= ->>>>>>> palantir/spark-25299 conf.set("spark.file.transferTo", String.valueOf(transferTo)) conf.set("spark.shuffle.file.buffer", "32k") - val shuffleWriteSupport = new DefaultShuffleWriteSupport(conf, blockResolver) val shuffleWriter = new BypassMergeSortShuffleWriter[String, String]( blockManager, From e82e408056f3e77e5473b65adc9140694a0d7f70 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 18 Apr 2019 19:20:44 -0700 Subject: [PATCH 11/12] Use returned locations in writers --- .../spark/shuffle/sort/BypassMergeSortShuffleWriter.java | 4 ++-- .../apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 7 +++++-- .../org/apache/spark/shuffle/sort/SortShuffleWriter.scala | 4 ++-- .../spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 7 ++++--- .../spark/shuffle/sort/SortShuffleWriterBenchmark.scala | 8 ++++++-- .../spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala | 4 +++- 6 files changed, 22 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java index 04fddb3a44713..434286175e415 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java @@ -173,10 +173,10 @@ public void write(Iterator> records) throws IOException { } partitionLengths = writePartitionedData(mapOutputWriter); - mapOutputWriter.commitAllPartitions(); + Optional mapLocations = mapOutputWriter.commitAllPartitions(); mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), - DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), + mapLocations.orNull(), partitionLengths); } catch (Exception e) { try { diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index b74fa0b7376a4..232b361313124 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -23,6 +23,8 @@ import java.nio.channels.WritableByteChannel; import java.util.Iterator; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.shuffle.MapShuffleLocations; import scala.Option; import scala.Product2; import scala.collection.JavaConverters; @@ -221,6 +223,7 @@ void closeAndWriteOutput() throws IOException { final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions()); final long[] partitionLengths; + Optional mapLocations; try { try { partitionLengths = mergeSpills(spills, mapWriter); @@ -231,7 +234,7 @@ void closeAndWriteOutput() throws IOException { } } } - mapWriter.commitAllPartitions(); + mapLocations = mapWriter.commitAllPartitions(); } catch (Exception e) { try { mapWriter.abort(e); @@ -242,7 +245,7 @@ void closeAndWriteOutput() throws IOException { } mapStatus = MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), - DefaultMapShuffleLocations.get(blockManager.shuffleServerId()), + mapLocations.orNull(), partitionLengths); } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 6ce309aa84c21..1fcae684b0052 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -67,10 +67,10 @@ private[spark] class SortShuffleWriter[K, V, C]( val mapOutputWriter = writeSupport.createMapOutputWriter( dep.shuffleId, mapId, dep.partitioner.numPartitions) val partitionLengths = sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter) - mapOutputWriter.commitAllPartitions() + val mapLocations = mapOutputWriter.commitAllPartitions() mapStatus = MapStatus( blockManager.shuffleServerId, - DefaultMapShuffleLocations.get(blockManager.shuffleServerId), + mapLocations.orNull(), partitionLengths) } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 012dc5d21bce4..5f0de31bd25e3 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -172,6 +172,8 @@ public void setUp() throws IOException { when(shuffleDep.serializer()).thenReturn(serializer); when(shuffleDep.partitioner()).thenReturn(hashPartitioner); when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager); + when(blockManager.shuffleServerId()).thenReturn(BlockManagerId.apply( + "0", "localhost", 9099, Option.empty())); TaskContext$.MODULE$.setTaskContext(taskContext); } @@ -188,8 +190,7 @@ private UnsafeShuffleWriter createWriter( taskContext, conf, taskContext.taskMetrics().shuffleWriteMetrics(), - new DefaultShuffleWriteSupport(conf, shuffleBlockResolver) - ); + new DefaultShuffleWriteSupport(conf, shuffleBlockResolver, blockManager.shuffleServerId())); } private void assertSpillFilesWereCleanedUp() { @@ -550,7 +551,7 @@ public void testPeakMemoryUsed() throws Exception { taskContext, conf, taskContext.taskMetrics().shuffleWriteMetrics(), - new DefaultShuffleWriteSupport(conf, shuffleBlockResolver)); + new DefaultShuffleWriteSupport(conf, shuffleBlockResolver, blockManager.shuffleServerId())); // Peak memory should be monotonically increasing. More specifically, every time // we allocate a new page it should increase by exactly the size of the page. diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala index 32257b0cc4b56..6892b3ce4f868 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala @@ -18,11 +18,12 @@ package org.apache.spark.shuffle.sort import org.mockito.Mockito.when - import org.apache.spark.{Aggregator, SparkEnv, TaskContext} + import org.apache.spark.benchmark.Benchmark import org.apache.spark.shuffle.BaseShuffleHandle import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport +import org.apache.spark.storage.BlockManagerId /** * Benchmark to measure performance for aggregate primitives. @@ -77,7 +78,10 @@ object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager) TaskContext.setTaskContext(taskContext) - val writeSupport = new DefaultShuffleWriteSupport(defaultConf, blockResolver) + val writeSupport = new DefaultShuffleWriteSupport( + defaultConf, + blockResolver, + BlockManagerId("0", "localhost", 9099)) val shuffleWriter = new SortShuffleWriter[String, String, String]( blockResolver, diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala index 20bf3eac95d84..0e659ff7cc5f3 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/UnsafeShuffleWriterBenchmark.scala @@ -19,6 +19,7 @@ package org.apache.spark.shuffle.sort import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.benchmark.Benchmark import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport +import org.apache.spark.storage.BlockManagerId /** * Benchmark to measure performance for aggregate primitives. @@ -43,7 +44,8 @@ object UnsafeShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { def getWriter(transferTo: Boolean): UnsafeShuffleWriter[String, String] = { val conf = new SparkConf(loadDefaults = false) conf.set("spark.file.transferTo", String.valueOf(transferTo)) - val shuffleWriteSupport = new DefaultShuffleWriteSupport(conf, blockResolver) + val shuffleWriteSupport = new DefaultShuffleWriteSupport( + conf, blockResolver, BlockManagerId("0", "localhost", 9099)) TaskContext.setTaskContext(taskContext) new UnsafeShuffleWriter[String, String]( From 0fa63f01f1ea73ac37f3fb7609e5965d7df23065 Mon Sep 17 00:00:00 2001 From: mcheah Date: Thu, 18 Apr 2019 20:13:50 -0700 Subject: [PATCH 12/12] Fix style --- .../apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala index 6892b3ce4f868..b0ff15cb1f790 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala @@ -18,8 +18,8 @@ package org.apache.spark.shuffle.sort import org.mockito.Mockito.when -import org.apache.spark.{Aggregator, SparkEnv, TaskContext} +import org.apache.spark.{Aggregator, SparkEnv, TaskContext} import org.apache.spark.benchmark.Benchmark import org.apache.spark.shuffle.BaseShuffleHandle import org.apache.spark.shuffle.sort.io.DefaultShuffleWriteSupport