Skip to content

Conversation

@yifeih
Copy link

@yifeih yifeih commented Mar 26, 2019

No description provided.

@yifeih yifeih changed the title [SPARK-25299] [WIP] shuffle reader API [SPARK-25299] shuffle reader API Mar 26, 2019
@yifeih
Copy link
Author

yifeih commented Mar 26, 2019

@mccheah @ifilonenko for initial review

The thing I found most awkward about this PR is translating things into an API and then translating them immediately back. This happens twice, once for translating form Scala Iterables to Java Interables just to fit the Java API, and then translating it back to Scala to make it compatible with the ShuffleFetchBlockIterator because it's implemented in Java. Another is translating everything into a ShuffleLocationBlocks and then back out because certain things, like BlockId are not part of the public API.

private final ShuffleBlockInfo[] shuffleBlocks;
private final Optional<BlockManagerId> shuffleLocation;

public static final class ShuffleBlockInfo {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like a great candidate for a scala case class - does this need to be java code?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of the API is in Java to allow people to write the plugin in either scala or java, so because of that reason, i think it makes more sense to have java here

Copy link

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add more later today


package org.apache.spark.api.shuffle;

public final class ShuffleBlockInfo {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Spark doesn't usually put final modifiers.

when(mapOutputTracker.getMapSizesByExecutorId(shuffleId, reduceId, reduceId + 1))
.thenAnswer(new Answer[Iterator[(BlockManagerId, Seq[(BlockId, Long)])]] {
def answer(invocationOnMock: InvocationOnMock):
Iterator[(BlockManagerId, Seq[(BlockId, Long)])] = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4-space indent this from def

serializerManager: SerializerManager,
mapOutputTracker: MapOutputTracker) extends ShuffleReadSupport {

val maxBytesInFlight = SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be private?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm yes xD

shuffleMetrics = TaskContext.get().taskMetrics().createTempShuffleReadMetrics()
).toCompletionIterator

new ShuffleBlockInputStreamIterator(shuffleBlockFetchIterator).toIterable.asJava
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toIterable is scary because it assumes that the single iterator instance can be returned multiple times, when in fact by definition an iterator object can only be traversed once. I think we want to return an Iterable object that creates the ShuffleBlockFetcherIterator when Iterable#iterator() is called.

new ShuffleBlockInputStreamIterator(Iterator.empty).toIterable.asJava
} else {
val minReduceId = blockMetadata.asScala.map(block => block.getReduceId).min
val maxReduceId = blockMetadata.asScala.map(block => block.getReduceId).max
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can derive both the minReduceId and the maxReduceId in a single pass over the blockMetadata. It would probably require a manual search that tracks both the running minimum and the running maximum over the iteration. Since the algorithm wouldn't be hard here I think it's worth doing to save some CPU cycles.

}
}

private class ShuffleBlockInputStreamIterator(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the explicit iterator subclass? I'd think we can just call blockFetchIterator.map(_._2).asJava.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh scala magic that I am still learning 🙃

mapOutputTracker: MapOutputTracker) extends ShuffleReadSupport {

private val maxBytesInFlight =
SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just pass in the SparkConf - we can certainly do that once we have ShuffleExecutorComponents hooked up.

@mccheah
Copy link

mccheah commented Apr 15, 2019

@squito we have plans to discuss this further offline. I'll just note for now that I posted a proof of concept implementation of the reader APIs here: https://github.com/mccheah/ignite-shuffle-service/pull/1/files. It's largely incomplete and not tested, but hopefully it illustrates some of the API decisions with another implementation.

@mccheah
Copy link

mccheah commented Apr 19, 2019

We decided to introduce an exception API, which will probably just be having FetchFailedException be part of the plugin API as an exception available for the reader to throw. But we'll tackle both that matter and the scheduler changes in a follow-up patch.

@yifeih and I talked and we want to get the shuffle locations API merged first and to have this PR use the locations as well.


@Override
public int hashCode() {
return Objects.hash(shuffleId, mapId, reduceId, length);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we hash the location?

&& shuffleId == ((ShuffleBlockInfo) other).shuffleId
&& mapId == ((ShuffleBlockInfo) other).mapId
&& reduceId == ((ShuffleBlockInfo) other).reduceId
&& length == ((ShuffleBlockInfo) other).length;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check equality against the location?

package org.apache.spark.api.shuffle;

import java.util.Objects;
import java.util.Optional;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Optional. I've noticed around Spark they use their own implementation of Optional, pretty sure it's org.apache.spark.api.java.Optional. I think we're expected to use that everywhere?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JRE's optional is not serializable, which is why the Spark one exists. Not sure whether that matters here.

* and writers are expected to cast this down to an implementation-specific representation.
*/
public interface ShuffleLocation {
ShuffleLocation EMPTY_LOCATION = new ShuffleLocation() {};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this - can't we pass Optional.empty everywhere?

*
* @throws Exception if current block cannot be retried.
*/
default void retryLastBlock(Throwable t) throws Exception {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this construct still be part of the API? We talked offline about special casing the default shuffle implementation on this subject since stream corruption shouldn't be a factor in non-Spark-core plugins.

val shuffleLoc = status.mapShuffleLocations.getLocationForBlock(part)
splitsByAddress.getOrElseUpdate(shuffleLoc, ListBuffer()) +=
if (status.mapShuffleLocations == null) {
splitsByAddress.getOrElseUpdate(ShuffleLocation.EMPTY_LOCATION, ListBuffer()) +=
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we return a map of Option[ShuffleLocation] as the key? Removes the need for this placeholder.

@Experimental
public class ShuffleReaderInputStream {

private final ShuffleBlockInfo shuffleBlockInfo;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this a bit more, and I don't think we want to be putting the shuffle block info into this part of the API. The reader should just return plain InputStream objects without this wrapper.

We originally included the block info here because in the default implementation, we need to know the length of the block to know if we should be checking for corruption. But this is again only applicable for the default implementation. Putting the block info here would result in it being unused by other shuffle storage plugins.

What I think we want instead is for BlockStoreShuffleReader to check that each returned InputStream is an instance of some class, then have that input stream class have a method that returns which block it's reading and what its length is. Something like that, anyways.

We might want to use pattern matching instead of isInstanceof and asInstanceOf in a lot of cases.

Can we see if we can get the reader API to return Iterable<InputStream>?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the slow turnaround. I'm not sure that returning just Iterable<InputStream> would work because the decryption/decompression function, serializerManager.wrapStream must take a BlockId as one of its arguments. Every implementation should have some way of giving that information back to the BlockStoreShuffleReader for the decryption/decompression to be feasibly done outside the plugin.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok that makes sense, let's keep it this way until we can think of something better. I don't think we necessarily need to use wrapStream directly - we can call wrapForCompression and wrapForEncryption individually, both of which only accepts streams and not requiring block ids. I think block ids are only required to check the type of input stream it is, but you could do that check a layer above? Regardless this way might be fine as is, but look around and see what we can do.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok yea I guess I could call those two function directly. I think I can also move the corruption detection logic back inside the ShuffleBlockFetcherIterator in that case too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea it looks like other parts of the codebase call those two functions directly, so it seems reasonable

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well now the class is no longer used, we should delete it yeah? =)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind it's gone =D

@bulldozer-bot bulldozer-bot bot merged commit b35d238 into spark-25299 Apr 30, 2019
@bulldozer-bot bulldozer-bot bot deleted the yh/reader-api branch April 30, 2019 21:07
@yifeih
Copy link
Author

yifeih commented May 1, 2019

ah oops thanks @mccheah!

@ifilonenko
Copy link

Good work @yifeih :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants