-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32916][SHUFFLE][test-maven][test-hadoop2.7] Ensure the number of chunks in meta file and index file are equal #30433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@tgravescs @Ngone51 @attilapiros @mridulm @Victsm |
Scratch that, missed it in the description |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given the focus here is error handling, how are we handling a seek failure in catch block ? (Here and elsewhere in this PR)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. I have overlooked IOExceptions from the seeks. Any IOException from the seek would not guarantee that the position is updated. So any more updates to the file may not overwrite the corrupt data which means that the files are corrupted.
One way of handling this I think is maintain the expected length of both index and meta files in AppShufflePartitionInfo instead of relying on the FilePointer. Updates to these files would be similar to writing to the merged data file as below:
if (partitionInfo.isEncounteredFailure()) {
long updatedPos = partitionInfo.getPosition() + length;
logger.debug(
"{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}",
partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
partitionInfo.reduceId, partitionInfo.getPosition(), updatedPos);
length += partitionInfo.dataChannel.write(buf, updatedPos);
} else {
length += partitionInfo.dataChannel.write(buf);
}
Please let me know if there are any other suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On a second thought, if there was an exception writing and then again an exception while seek, is there a possibility that any further updates to these files will succeed?
Usually these exceptions wouldn't indicate any transient error with the FileSystem.
Wouldn't it be better to just stop merging any more blocks of this shuffle partition?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should assume that.
We could have taken an approach where we give up on writing merged shuffle data file as well.
The issue I see here is that we are mixing block level exception with chunk level exception.
Right now, both will throw exceptions triggering the onFailure handling.
However, the current onFailure handling is meant for block level only.
I think we should catch the chunk level IOExceptions inside onComplete, setting certain flags to make the onFailure handling logic know about whether we encountered a block level failure or a chunk level failure.
For a chunk level failure, we shouldn't overwrite the previous block, we should effectively only delay the closure of the chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should assume that.
We could have taken an approach where we give up on writing merged shuffle data file as well.
Here the shuffle service is writing to a local file. It's not a network filesystem/hdfs where because of network there could be transient exceptions. If during updating the local index/meta file the shuffle service sees an IOException and then there is an IOException again while seeking to a position, chances that the updates to the same file will fail are very high. If index/meta files are not successfully updated then those merged blocks cannot be served to the executors when they read/fetch it.
If most of the time IOExceptions while writing to local filesystem can only be because of issues like file permission, disk full, etc, I don't see the advantage of still trying to keep merging blocks to the shuffle partition.
It might be better for the server to reply to the executor to stop pushing in this case. Otherwise, they keep pushing more blocks and in most of the cases they will fail as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Openjdk seek seems to use lseek64
// move file pointer to the specified offset
jlong os::seek_to_file_offset(int fd, jlong offset) {
return (jlong)::lseek64(fd, (off64_t)offset, SEEK_SET);
}
lseek64 is just a variation of lseek that uses 64 bit file offset.
Below are the errors from https://linux.die.net/man/2/lseek
EBADF
fd is not an open file descriptor.
EINVAL
whence is not valid. Or: the resulting file offset would be negative, or beyond the end of a seekable device.
EOVERFLOW
The resulting file offset cannot be represented in an off_t.
ESPIPE
fd is associated with a pipe, socket, or FIFO.
ENXIO
whence is SEEK_DATA or SEEK_HOLE, and the current file offset is beyond the end of the file.
In our case, we always seek to a previous point in the file and none of these errors would be recoverable in that case.
So any exceptions during seek should not be recoverable.
@mridulm @Victsm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for confirming @otterc, this means we should not have seek throwing exception in almost all cases for us.
We have two paths here:
a) Ignore seek exception as unexpected.
b) Abort merge, assuming EBADF/etc - as it is unrecoverable error.
I am partial towards (b) as we cannot recover with seek exception. Thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to reiterate the change I am working on:
- Since seek is not recoverable, we let the clients know to stop pushing for a particular shuffle partition.
- Have a threshold on number of IOExceptions from writes and when this threshold is reached for a single partition, inform the client to stop pushing and stop merging this partition.
- When the update to metadata fails, not propagate this exception back to client so that they push the block again. The size of the current chunk may grow but with (2) in place it will still be of a manageable size.
These changes will impact #30312. Right now the client stops pushing for the entire shuffle not a particular shuffle partition. The support needs to be extended to stop pushing for a particular shuffle partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you file a JIRA ticket for the corresponding work for the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created this Jira:
https://issues.apache.org/jira/browse/SPARK-33665
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
|
ok to test |
|
Test build #131575 has started for PR 30433 at commit |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #131594 has finished for PR 30433 at commit
|
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
|
While making these changes and adding the unit tests I think it's better to not rely on the file pointer from RandomAccessFile and maintain our own pointer. Thus, not use RandomAccessFile at all. Let me know if you think otherwise. |
30c8f83 to
d3eb3e7
Compare
|
@mridulm @Victsm @Ngone51 @tgravescs @attilapiros Please take a look when you get a chance. |
|
Test build #132091 has started for PR 30433 at commit |
d3eb3e7 to
9af804c
Compare
|
Kubernetes integration test starting |
9af804c to
f5783a9
Compare
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #132098 has finished for PR 30433 at commit
|
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Outdated
Show resolved
Hide resolved
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
|
Kubernetes integration test starting |
22c7a32 to
5422f10
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
.../network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
Show resolved
Hide resolved
|
Test build #132448 has finished for PR 30433 at commit
|
…xceeded threshold
|
Kubernetes integration test starting |
| try { | ||
| writeAnyDeferredBufs(); | ||
| if (deferredBufs != null && !deferredBufs.isEmpty()) { | ||
| abortIfNecessary(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this need to be inside the if block, instead of keeping writeDeferredBufs as is and just calling abortIfNecessary before writeDeferredBufs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that if a stream is completing but doesn't need to write any deferred bufs then we should let it complete without failures.
| deferredBufs = null; | ||
| return; | ||
| } | ||
| abortIfNecessary(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should invoke this here.
Invoking inside onComplete should be sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Purpose is to only do this check once per block instead of once per buf and avoid throwing unnecessary exceptions from onData which leads to channel close.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Victsm This change is to address this comment:
#30433 (comment)
The scenario is that there can be pending streams which are waiting on the lock for the partitionInfo and meanwhile the exception threshold has met. When the pending stream acquires the lock it will attempt to write to the data file even though the exception threshold is reached.
I have added the UT testPendingBlockIsAbortedImmediately to verify this.
Purpose is to only do this check once per block instead of once per buf and avoid throwing unnecessary exceptions from onData which leads to channel close.
We already throw IOExceptions when write to data file fails. I don't see how throwing IOException exceeded threshold makes it any worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way I see the issue mentioned in that comment is that we are not preventing new blocks to be merged when the IOException threshold is reached.
To do that, we only need to invoke abortIfNecessary inside onComplete, whether we still have any deferredBuf to write at that point.
This way, for normal case without IOException, we are only invoking abortIfNecessary once per block.
By invoking it here, we would invoke it once per buf for normal case.
Of course, if we only check inside onComplete, we would delay rejection of these pending blocks until we reach their stream's end.
I think this is a reasonable tradeoff to make, considering that majority of the time the code is executing for normal case instead of the exception case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmmm. I think the intention is to not have the server attempt writing if the threshold is reached for the partition. Probably this check here makes this behavior more accurate.
However, I don't have a strong opinion on this since the assumption is that if these number of IOExceptions have already reached the threshold, any further writes will result in an IOException as well. With that assumption, the write in onData after this threshold is met, will very likely throw IOException as well and since the threshold is already met, the server will instead throw IOExceptions exceeded threshold.
I can remove it from onData. @Ngone51 What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had an offline discussion with @mridulm and @otterc , the concern about closing channels with throwing exception mid-stream seems negligible since it only happens after reaching the max IOException count.
Calling abortIfNecessary inside onData should also have negligible performance implications.
With those, I think it should be fine to keep this part of the code as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering does the server closes the channel or client would stop streaming the remaining data when we throw the exception in onData. Otherwise, we'd receive the following data from the client and throw the exception for multiple times.
(I didn't find anywhere we close the channel or the client may have a chance to stop streaming. I may miss it somewhere.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 I think that's an existing behavior in Spark with how it uses StreamInterceptor to stream data coming from a RPC message that's out of the frame.
This is also documented with SPARK-6237 in #21346:
spark/common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
Lines 62 to 66 in 82aca7e
| * An error while reading data from the stream | |
| * ({@link org.apache.spark.network.client.StreamCallback#onData(String, ByteBuffer)}) | |
| * will fail the entire channel. A failure in "post-processing" the stream in | |
| * {@link org.apache.spark.network.client.StreamCallback#onComplete(String)} will result in an | |
| * rpcFailure, but the channel will remain active. |
It is the server which closes the channel if exception is thrown from onData.
Once an exception gets thrown from onData while StreamInterceptor hasn't finished processing all the out of frame bytes for a given RPC message, the TransportFrameDecoder will no longer be able to successfully decode following RPC messages from this channel.
Thus, the server needs to close the channel at this point.
Once the channel gets closed, the client will no longer be able to transfer any more data to the server using the same channel.
The connection needs to be reestablished at this time, resetting state on the client side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client does receive the exception thrown from onData. I simulated an exception from onData at the server for a particular push block.
Below are the logs of the client.
Note: I am running an older version of magnet which still uses ShuffleBlockId for shuffle push and some other classes are old.
20/12/11 19:14:48 INFO TransportClientFactory: Successfully created connection to ltx1-hcl3213.grid.linkedin.com/10.150.24.33:7337 after 12 ms (5 ms spent in bootstraps)
20/12/11 19:14:56 ERROR RetryingBlockFetcher: Failed to fetch block shuffle_1_7_7, and will not retry (1 retries)
org.apache.spark.network.shuffle.BlockPushException: ^H^@^@^@^_application_1602506816280_53624^@^@^@
shuffle_1_7_7^@^@^@^Gjava.io.IOException: Destination failed while reading stream
at org.apache.spark.network.server.TransportRequestHandler$3.onFailure(TransportRequestHandler.java:244)
at org.apache.spark.network.client.StreamInterceptor.exceptionCaught(StreamInterceptor.java:58)
at org.apache.spark.network.util.TransportFrameDecoder.exceptionCaught(TransportFrameDecoder.java:188)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:850)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:364)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at org.spark_project.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at org.spark_project.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at org.spark_project.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
at org.spark_project.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at org.spark_project.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at org.spark_project.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at org.spark_project.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at org.spark_project.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: FAILING this stream
After this, the client logs show that the server has terminated the connection.
20/12/11 19:14:57 WARN TransportChannelHandler: Exception in connection from ltx1-hcl3412.grid.linkedin.com/10.150.55.78:7337
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
20/12/11 19:14:57 ERROR TransportResponseHandler: Still have 38 requests outstanding when connection from ltx1-hcl3412.grid.linkedin.com/10.150.55.78:7337 is closed
|
Kubernetes integration test status success |
|
Test build #132471 has finished for PR 30433 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #132507 has finished for PR 30433 at commit
|
|
The changes look good to me. Is there anything else pending here @otterc ? |
|
|
Current change looks good to me as well. |
|
ok to test |
|
Kicked off test once more, will merge once it goes through. |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #133233 has finished for PR 30433 at commit
|
|
The failure is unrelated. |
…of chunks in meta file and index file are equal ### What changes were proposed in this pull request? 1. Fixes for bugs in `RemoteBlockPushResolver` where the number of chunks in meta file and index file are inconsistent due to exceptions while writing to either index file or meta file. This java class was introduced in #30062. - If the writing to index file fails, the position of meta file is not reset. This means that the number of chunks in meta file is inconsistent with index file. - During the exception handling while writing to index/meta file, we just set the pointer to the start position. If the files are closed just after this then it doesn't get rid of any the extra bytes written to it. 2. Adds an IOException threshold. If the `RemoteBlockPushResolver` encounters IOExceptions greater than this threshold while updating data/meta/index file of a shuffle partition, then it responds to the client with exception- `IOExceptions exceeded the threshold` so that client can stop pushing data for this shuffle partition. 3. When the update to metadata fails, exception is not propagated back to the client. This results in the increased size of the current chunk. However, with (2) in place, the current chunk will still be of a manageable size. ### Why are the changes needed? This fix is needed for the bugs mentioned above. 1. Moved writing to meta file after index file. This fixes the issue because if there is an exception writing to meta file, then the index file position is not updated. With this change, if there is an exception writing to index file, then none of the files are effectively updated and the same is true vice-versa. 2. Truncating the lengths of data/index/meta files when the partition is finalized. 3. When the IOExceptions have reached the threshold, it is most likely that future blocks will also face the issue. So, it is better to let the clients know so that they can stop pushing the blocks for that partition. 4. When just the meta update fails, client retries pushing the block which was successfully merged to data file. This can be avoided by letting the chunk grow slightly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests for all the bugs and threshold. Closes #30433 from otterc/SPARK-32916-followup. Authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 0677c39) Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
|
Thanks @mridulm ! |
|
Thanks for letting me know @mridulm. |
…of chunks in meta file and index file are equal ### What changes were proposed in this pull request? 1. Fixes for bugs in `RemoteBlockPushResolver` where the number of chunks in meta file and index file are inconsistent due to exceptions while writing to either index file or meta file. This java class was introduced in #30062. - If the writing to index file fails, the position of meta file is not reset. This means that the number of chunks in meta file is inconsistent with index file. - During the exception handling while writing to index/meta file, we just set the pointer to the start position. If the files are closed just after this then it doesn't get rid of any the extra bytes written to it. 2. Adds an IOException threshold. If the `RemoteBlockPushResolver` encounters IOExceptions greater than this threshold while updating data/meta/index file of a shuffle partition, then it responds to the client with exception- `IOExceptions exceeded the threshold` so that client can stop pushing data for this shuffle partition. 3. When the update to metadata fails, exception is not propagated back to the client. This results in the increased size of the current chunk. However, with (2) in place, the current chunk will still be of a manageable size. ### Why are the changes needed? This fix is needed for the bugs mentioned above. 1. Moved writing to meta file after index file. This fixes the issue because if there is an exception writing to meta file, then the index file position is not updated. With this change, if there is an exception writing to index file, then none of the files are effectively updated and the same is true vice-versa. 2. Truncating the lengths of data/index/meta files when the partition is finalized. 3. When the IOExceptions have reached the threshold, it is most likely that future blocks will also face the issue. So, it is better to let the clients know so that they can stop pushing the blocks for that partition. 4. When just the meta update fails, client retries pushing the block which was successfully merged to data file. This can be avoided by letting the chunk grow slightly. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests for all the bugs and threshold. Closes #30433 from otterc/SPARK-32916-followup. Authored-by: Chandni Singh <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
RemoteBlockPushResolverwhere the number of chunks in meta file and index file are inconsistent due to exceptions while writing to either index file or meta file. This java class was introduced in [SPARK-32916][SHUFFLE] Implementation of shuffle service that leverages push-based shuffle in YARN deployment mode #30062.RemoteBlockPushResolverencounters IOExceptions greater than this threshold while updating data/meta/index file of a shuffle partition, then it responds to the client with exception-IOExceptions exceeded the thresholdso that client can stop pushing data for this shuffle partition.Why are the changes needed?
This fix is needed for the bugs mentioned above.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added unit tests for all the bugs and threshold.