Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.api.shuffle;

import org.apache.spark.annotation.Experimental;

import java.io.Serializable;

/**
* Represents metadata about where shuffle blocks were written in a single map task.
* <p>
* This is optionally returned by shuffle writers. The inner shuffle locations may
* be accessed by shuffle readers. Shuffle locations are only necessary when the
* location of shuffle blocks needs to be managed by the driver; shuffle plugins
* may choose to use an external database or other metadata management systems to
* track the locations of shuffle blocks instead.
*/
@Experimental
public interface MapShuffleLocations extends Serializable {

/**
* Get the location for a given shuffle block written by this map task.
*/
ShuffleLocation getLocationForBlock(int reduceId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

/**
* Marker interface representing a location of a shuffle block. Implementations of shuffle readers
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be even more specific, it's not the location of a shuffle block, but a location from which to retrieve shuffle blocks. Trying to figure out how to word this so that people don't get this confused with a ShuffleBlockId. Maybe you can say "a location of a shuffle block manager" instead to differentiate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not always the location of a shuffle block manager though because I can choose a server that's running just a file store for example.

* and writers are expected to cast this down to an implementation-specific representation.
*/
public interface ShuffleLocation {
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,10 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
DefaultMapShuffleLocations.get(blockManager.shuffleServerId()),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by the changes to the writers -- I guess for now you're just hard-coding it, and later on there will be a place for plugins to do something different here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's correct, this is just a placeholder.

partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -166,7 +169,10 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
DefaultMapShuffleLocations.get(blockManager.shuffleServerId()),
partitionLengths);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.api.shuffle.ShuffleLocation;
import org.apache.spark.storage.BlockManagerId;

import java.util.Objects;

public class DefaultMapShuffleLocations implements MapShuffleLocations, ShuffleLocation {

/**
* We borrow the cache size from the BlockManagerId's cache - around 1MB, which should be
* feasible.
*/
private static final LoadingCache<BlockManagerId, DefaultMapShuffleLocations>
DEFAULT_SHUFFLE_LOCATIONS_CACHE =
CacheBuilder.newBuilder()
.maximumSize(10000)
.build(new CacheLoader<BlockManagerId, DefaultMapShuffleLocations>() {
@Override
public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) {
return new DefaultMapShuffleLocations(blockManagerId);
}
});

private final BlockManagerId location;

public DefaultMapShuffleLocations(BlockManagerId blockManagerId) {
this.location = blockManagerId;
}

public static DefaultMapShuffleLocations get(BlockManagerId blockManagerId) {
return DEFAULT_SHUFFLE_LOCATIONS_CACHE.getUnchecked(blockManagerId);
}

@Override
public ShuffleLocation getLocationForBlock(int reduceId) {
return this;
}

public BlockManagerId getBlockManagerId() {
return location;
}

@Override
public boolean equals(Object other) {
return other instanceof DefaultMapShuffleLocations
&& Objects.equals(((DefaultMapShuffleLocations) other).location, location);
}

@Override
public int hashCode() {
return Objects.hashCode(location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,10 @@ void closeAndWriteOutput() throws IOException {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
DefaultMapShuffleLocations.get(blockManager.shuffleServerId()),
partitionLengths);
}

@VisibleForTesting
Expand Down
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.spark.api.shuffle.MapShuffleLocations
import org.apache.spark.broadcast.{Broadcast, BroadcastManager}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -282,7 +283,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging

// For testing
def getMapSizesByExecutorId(shuffleId: Int, reduceId: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
: Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of these methods should probably renamed to getMapSizesByShuffleLocation

...

or is the idea that when you use a plugin, you won't touch MapOutputTracker at all, and these changes are just necessary bookkeeping?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read code will still ask MapOutputTracker for blocks by location. In the case that the plugin isn't using locations, it's just going to get back an iterator of a single element: the null location, and the Seq containing all of the blocks. We then need to convert whatever iterator we get back into an appropriate request to the reader plugin. If we like I can write a proof of concept PR against this code on the read side, because it's a little difficult to concretely reason about this otherwise.

getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1)
}

Expand All @@ -296,7 +297,7 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging
* describing the shuffle blocks that are stored at that block manager.
*/
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])]
: Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])]

/**
* Deletes map output status information for the specified shuffle stage.
Expand Down Expand Up @@ -646,7 +647,7 @@ private[spark] class MapOutputTrackerMaster(
// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
// This method is only called in local-mode.
def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
: Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
shuffleStatuses.get(shuffleId) match {
case Some (shuffleStatus) =>
Expand Down Expand Up @@ -683,11 +684,12 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr

// Get blocks sizes by executor Id. Note that zero-sized blocks are excluded in the result.
override def getMapSizesByExecutorId(shuffleId: Int, startPartition: Int, endPartition: Int)
: Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
: Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = {
logDebug(s"Fetching outputs for shuffle $shuffleId, partitions $startPartition-$endPartition")
val statuses = getStatuses(shuffleId)
try {
MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, statuses)
MapOutputTracker.convertMapStatuses(
shuffleId, startPartition, endPartition, statuses)
} catch {
case e: MetadataFetchFailedException =>
// We experienced a fetch failure so our mapStatuses cache is outdated; clear it:
Expand Down Expand Up @@ -871,9 +873,9 @@ private[spark] object MapOutputTracker extends Logging {
shuffleId: Int,
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus]): Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
statuses: Array[MapStatus]): Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was starting the Reader code and I was thinking this could easily return Iterator[(ShuffleLocation, Seq[(BlockId, Long)])] instead of Iterator[(MapShuffleLocations, Seq[(BlockId, Long)])]. It seems strange to pass back the MapShuffleLocations for an entire map stage when the BlockId already specifies both the mapId and reduceId. So I think it would make things more clear here, and avoid an extra step in the reader, if we just did

splitsByAddress.getOrElseUpdate(status.mapShuffleLocations.getLocationForBlock(part), ListBuffer())

on line 888.

I looked at this function, and it's only being used by the reader, which would only need the ShuffleLocation and not the MapShuffleLocations, and the tests. In the tests, they're used for verifying that the BlockManagerId is correct, which seems like the wrong thing to use because other implementations of the MapShuffleLocations that are not the default implementation aren't necessarily going to be referencing the BlockManagerId. Perhaps it's better to just write a separate function for retrieving the BlockManagerId direction for the tests.

assert (statuses != null)
val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long)]]
val splitsByAddress = new HashMap[MapShuffleLocations, ListBuffer[(BlockId, Long)]]
for ((status, mapId) <- statuses.iterator.zipWithIndex) {
if (status == null) {
val errorMessage = s"Missing an output location for shuffle $shuffleId"
Expand All @@ -883,7 +885,7 @@ private[spark] object MapOutputTracker extends Logging {
for (part <- startPartition until endPartition) {
val size = status.getSizeForBlock(part)
if (size != 0) {
splitsByAddress.getOrElseUpdate(status.location, ListBuffer()) +=
splitsByAddress.getOrElseUpdate(status.mapShuffleLocations, ListBuffer()) +=
((ShuffleBlockId(shuffleId, mapId, part), size))
}
}
Expand Down
Loading