-
Notifications
You must be signed in to change notification settings - Fork 0
[WIP] Implement newer APIs for shuffle read and write #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
|
||
| private final String appId; | ||
| private final Map<SparkShufflePartitionBlock, byte[]> currentBlocks = Maps.newHashMap(); | ||
| private final LinkedBlockingDeque<IgniteFuture<Map<SparkShufflePartitionBlock, byte[]>>> runningTasks = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito @yifeih so this is an example of how we could implement the shuffle reader APIs from palantir/spark#523 using a different storage backend, in this case Apache Ignite. I think this branch is out of date though so I need to publish Yifei's latest iterations to my local maven repository to work against it.
Here's some of the highlights I would note from this thought experiment:
- Pipelining can be implemented behind the API, not in front of it. I think the notion of parallel fetches and whether or not to fetch to disk vs. buffer blocks in memory should be left up to the plugin to decide.
- Stream corruption should more or less be impossible given the guarantees of Ignite as a system. I wonder if we care about stream corruption in any case other than the cases where we're using Netty as the transfer protocol. We could put stream corruption checks as a special case, e.g. we check if the shuffle reader is of type
DefaultShuffleBlockReaderorShuffleBlockFetcherIteratorand have an additional check layer, etc. - We rely on Ignite to be fully reliable - there isn't any retry logic here for any block fetches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pipelining can be implemented behind the API, not in front of it. I think the notion of parallel fetches and whether or not to fetch to disk vs. buffer blocks in memory should be left up to the plugin to decide.
I agree that it is fine to leave that up to the plugin. (I can't think of a case where you wouldn't want pipelining at the moment, but there is probably some case ...) I do think it would be nice if there were some reusable code to allow plugins to get pipelining, without having to implement it themselves from scratch.
I feel a little more strongly that fetch-to-disk should not be part of the plugin, since it has more to do with spark's memory management, but again its fine if its behind the api, just would be nice if there is reusable code for it.
We rely on Ignite to be fully reliable - there isn't any retry logic here for any block fetches.
I'm not certain I know what this means. (caveat: I don't really know anything about ignite, I'm viewing it as having similar properties to hdfs for these purposes.) Its certainly possible that an executor fails to read data from ignite: something is wrong with the executor, or the entire ignite cluster is unreachable or overloaded, or somebody accidentally wipes data from ignite.
I guess you are saying you have no explicit retries within one task from the ignite plugin, nor any special failure handling. But does that mean a failure is treated as a regular exception, so the task gets retried 4 times and then the job is failed? That means that a failure to read from ignite is catastrophic, and the spark app cannot recover -- that may be fine given your expectations from ignite. Or will it get translated to a FetchFailedException (by some intermediate parts of spark's code which surrounds this plugin), which will lead the scheduler to mark some data as missing and regenerate it? If so, are we sure its marking the right data as missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel a little more strongly that fetch-to-disk should not be part of the plugin, since it has more to do with spark's memory management, but again its fine if its behind the api, just would be nice if there is reusable code for it.
The problem is that the benefits of fetch to disk is that the data doesn't have to be copied into memory at all. But I think for blob stores that don't support network-to-disk direct download that this is more or less impossible. For example in Ignite we'd have to get the byte[] in memory at some point, so here we say that we don't want to both transfer into memory and also transfer to disk. We might as well keep all the data in one of three places:
- Memory of the executor
- Memory or disk of the Ignite store (that's up to Ignite's implementation and configuration)
- Over the network
I guess you are saying you have no explicit retries within one task from the ignite plugin, nor any special failure handling. But does that mean a failure is treated as a regular exception, so the task gets retried 4 times and then the job is failed? That means that a failure to read from ignite is catastrophic, and the spark app cannot recover -- that may be fine given your expectations from ignite.
Yup this is the case - failing to read from Ignite should be such a rare case that it's probably fine to assume the job is not recoverable if the cluster's blocks can't be fetched that many times. But I also have a lot to learn about the specific QoS guarantees of Ignite. Can we have similar expectations for HDFS? We're thinking about trying to implement this plugin as an experiment against S3 - I'd expect S3 to have similar kinds of guarantees, especially since S3 is also a persistent data store for many Spark applications. So for example, failing to read from S3 as a source dataset for a data frame would certainly cause a job failure anyways.
ah good point, I can see how a lot of these would not allow a zero-copy transfer to disk. But, I'd surprised if they don't give you an |
|
So the Ignite implementation here is effectively doing such an in-memory block buffering as well. The availability of streaming depends on the key-value store in question. The Ignite API requires getting all of the bytes for a given key. Here we simulate a chunked stream of bytes by splitting up the cache entries and splicing them together on the client (executor). |
Those sounds like conflicting statements to me. But from this usage, I can see what you mean that you just get the entire value as a IgniteFuture<Map<SparkShufflePartitionBlock, byte[]>> fetchTask = dataCache.getAllAsync(currentBatch);If that's really the only api you have, then you are right, fetch-to-disk is pretty pointless. I have a feeling that most implementation would have provide an InputStream api, though |
c842b91 to
1375305
Compare
1375305 to
e939f1f
Compare
No description provided.