-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-2670] FetchFailedException should be thrown when local fetch has failed #1578
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
e310c0b
460dc01
a3a9be1
b7b8250
4fca130
5d05855
03bcb02
d353984
e8713cc
85c8938
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -200,14 +200,21 @@ object BlockFetcherIterator { | |
| // these all at once because they will just memory-map some files, so they won't consume | ||
| // any memory that might exceed our maxBytesInFlight | ||
| for (id <- localBlocksToFetch) { | ||
| getLocalFromDisk(id, serializer) match { | ||
| case Some(iter) => { | ||
| // Pass 0 as size since it's not in flight | ||
| results.put(new FetchResult(id, 0, () => iter)) | ||
| logDebug("Got local block " + id) | ||
| try{ | ||
| getLocalFromDisk(id, serializer) match { | ||
| case Some(iter) => { | ||
| // Pass 0 as size since it's not in flight | ||
| results.put(new FetchResult(id, 0, () => iter)) | ||
| logDebug("Got local block " + id) | ||
| } | ||
| case None => { | ||
| throw new BlockException(id, "Could not get block " + id + " from local machine") | ||
| } | ||
| } | ||
| case None => { | ||
| throw new BlockException(id, "Could not get block " + id + " from local machine") | ||
| } catch { | ||
| case e: Exception => { | ||
| logError(s"Error occurred while fetch local block $id", e) | ||
| results.put(new FetchResult(id, -1, null)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't do drop and such on a ConcurrentQueue, since it might drop stuff other threads were adding. Just do a results.put on the failed block and don't worry about dropping other ones. You can actually move the try/catch into the for loop and add a "return" at the bottom of the catch after adding this failing FetchResult.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your comment, @mateiz .
But, if it returns from getLocalBlocks immediately rest of FetchResults is not set to results, and we waits on results.take() in next method forever right? results is a instance of LinkedBlockingQueue and take method is blocking method.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought next() would return a failure block, and then the caller of BlockFetcherIterator will just stop. Did you see it not doing that? I think all you have to do is put one FetchResult with size = -1 in the queue and return, and everything will be fine.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought wrong. Exactly, in current usage of BlockFetcherIterator, next() is not invoked after FetchFailedException has been thrown. |
||
| } | ||
|
||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small code style thing, add a space before the {