Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.api.shuffle;

import org.apache.spark.annotation.Experimental;

import java.io.Serializable;

/**
* Represents metadata about where shuffle blocks were written in a single map task.
* <p>
* This is optionally returned by shuffle writers. The inner shuffle locations may
* be accessed by shuffle readers. Shuffle locations are only necessary when the
* location of shuffle blocks needs to be managed by the driver; shuffle plugins
* may choose to use an external database or other metadata management systems to
* track the locations of shuffle blocks instead.
*/
@Experimental
public interface MapShuffleLocations extends Serializable {

/**
* Get the location for a given shuffle block written by this map task.
*/
ShuffleLocation getLocationForBlock(int reduceId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.shuffle;

/**
* Marker interface representing a location of a shuffle block. Implementations of shuffle readers
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be even more specific, it's not the location of a shuffle block, but a location from which to retrieve shuffle blocks. Trying to figure out how to word this so that people don't get this confused with a ShuffleBlockId. Maybe you can say "a location of a shuffle block manager" instead to differentiate?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not always the location of a shuffle block manager though because I can choose a server that's running just a file store for example.

* and writers are expected to cast this down to an implementation-specific representation.
*/
public interface ShuffleLocation {
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.api.java.Optional;

/**
* :: Experimental ::
Expand All @@ -31,7 +32,7 @@
public interface ShuffleMapOutputWriter {
ShufflePartitionWriter getNextPartitionWriter() throws IOException;

void commitAllPartitions() throws IOException;
Optional<MapShuffleLocations> commitAllPartitions() throws IOException;

void abort(Throwable error) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.nio.channels.WritableByteChannel;
import javax.annotation.Nullable;

import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import scala.None$;
import scala.Option;
import scala.Product2;
Expand Down Expand Up @@ -134,8 +136,11 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
try {
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
Optional<MapShuffleLocations> blockLocs = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
blockLocs.orNull(),
partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
Expand Down Expand Up @@ -168,8 +173,11 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}

partitionLengths = writePartitionedData(mapOutputWriter);
mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
Optional<MapShuffleLocations> mapLocations = mapOutputWriter.commitAllPartitions();
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
mapLocations.orNull(),
partitionLengths);
} catch (Exception e) {
try {
mapOutputWriter.abort(e);
Expand All @@ -178,6 +186,10 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
}
throw e;
}
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
DefaultMapShuffleLocations.get(blockManager.shuffleServerId()),
partitionLengths);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.api.shuffle.ShuffleLocation;
import org.apache.spark.storage.BlockManagerId;

import java.util.Objects;

public class DefaultMapShuffleLocations implements MapShuffleLocations, ShuffleLocation {

/**
* We borrow the cache size from the BlockManagerId's cache - around 1MB, which should be
* feasible.
*/
private static final LoadingCache<BlockManagerId, DefaultMapShuffleLocations>
DEFAULT_SHUFFLE_LOCATIONS_CACHE =
CacheBuilder.newBuilder()
.maximumSize(BlockManagerId.blockManagerIdCacheSize())
.build(new CacheLoader<BlockManagerId, DefaultMapShuffleLocations>() {
@Override
public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) {
return new DefaultMapShuffleLocations(blockManagerId);
}
});

private final BlockManagerId location;

public DefaultMapShuffleLocations(BlockManagerId blockManagerId) {
this.location = blockManagerId;
}

public static DefaultMapShuffleLocations get(BlockManagerId blockManagerId) {
return DEFAULT_SHUFFLE_LOCATIONS_CACHE.getUnchecked(blockManagerId);
}

@Override
public ShuffleLocation getLocationForBlock(int reduceId) {
return this;
}

public BlockManagerId getBlockManagerId() {
return location;
}

@Override
public boolean equals(Object other) {
return other instanceof DefaultMapShuffleLocations
&& Objects.equals(((DefaultMapShuffleLocations) other).location, location);
}

@Override
public int hashCode() {
return Objects.hashCode(location);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.nio.channels.WritableByteChannel;
import java.util.Iterator;

import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import scala.Option;
import scala.Product2;
import scala.collection.JavaConverters;
Expand Down Expand Up @@ -221,6 +223,7 @@ void closeAndWriteOutput() throws IOException {
final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
final long[] partitionLengths;
Optional<MapShuffleLocations> mapLocations;
try {
try {
partitionLengths = mergeSpills(spills, mapWriter);
Expand All @@ -231,7 +234,7 @@ void closeAndWriteOutput() throws IOException {
}
}
}
mapWriter.commitAllPartitions();
mapLocations = mapWriter.commitAllPartitions();
} catch (Exception e) {
try {
mapWriter.abort(e);
Expand All @@ -240,7 +243,10 @@ void closeAndWriteOutput() throws IOException {
}
throw e;
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
mapStatus = MapStatus$.MODULE$.apply(
blockManager.shuffleServerId(),
mapLocations.orNull(),
partitionLengths);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,6 @@ public ShuffleWriteSupport writes() {
throw new IllegalStateException(
"Executor components must be initialized before getting writers.");
}
return new DefaultShuffleWriteSupport(sparkConf, blockResolver);
return new DefaultShuffleWriteSupport(sparkConf, blockResolver, blockManager.shuffleServerId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.io.OutputStream;
import java.nio.channels.FileChannel;

import org.apache.spark.api.java.Optional;
import org.apache.spark.api.shuffle.MapShuffleLocations;
import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations;
import org.apache.spark.storage.BlockManagerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,6 +53,7 @@ public class DefaultShuffleMapOutputWriter implements ShuffleMapOutputWriter {
private final int bufferSize;
private int currPartitionId = 0;
private long currChannelPosition;
private final BlockManagerId shuffleServerId;

private final File outputFile;
private File outputTempFile;
Expand All @@ -61,11 +66,13 @@ public DefaultShuffleMapOutputWriter(
int shuffleId,
int mapId,
int numPartitions,
BlockManagerId shuffleServerId,
ShuffleWriteMetricsReporter metrics,
IndexShuffleBlockResolver blockResolver,
SparkConf sparkConf) {
this.shuffleId = shuffleId;
this.mapId = mapId;
this.shuffleServerId = shuffleServerId;
this.metrics = metrics;
this.blockResolver = blockResolver;
this.bufferSize =
Expand All @@ -90,10 +97,11 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException {
}

@Override
public void commitAllPartitions() throws IOException {
public Optional<MapShuffleLocations> commitAllPartitions() throws IOException {
cleanUp();
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
return Optional.of(DefaultMapShuffleLocations.get(shuffleServerId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.storage.BlockManagerId;

public class DefaultShuffleWriteSupport implements ShuffleWriteSupport {

private final SparkConf sparkConf;
private final IndexShuffleBlockResolver blockResolver;
private final BlockManagerId shuffleServerId;

public DefaultShuffleWriteSupport(
SparkConf sparkConf,
IndexShuffleBlockResolver blockResolver) {
IndexShuffleBlockResolver blockResolver,
BlockManagerId shuffleServerId) {
this.sparkConf = sparkConf;
this.blockResolver = blockResolver;
this.shuffleServerId = shuffleServerId;
}

@Override
Expand All @@ -41,7 +45,7 @@ public ShuffleMapOutputWriter createMapOutputWriter(
int mapId,
int numPartitions) {
return new DefaultShuffleMapOutputWriter(
shuffleId, mapId, numPartitions,
shuffleId, mapId, numPartitions, shuffleServerId,
TaskContext.get().taskMetrics().shuffleWriteMetrics(), blockResolver, sparkConf);
}
}
Loading