-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-2677] BasicBlockFetchIterator#next can wait forever #1632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
9b7b7c1
a454239
0174d6a
ade279a
8a73974
7cbb8ca
0dd9ad3
9b620a6
7ed48be
e85f88b
d3bd2a8
cddbc7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ import java.nio._ | |
| import java.nio.channels._ | ||
| import java.nio.channels.spi._ | ||
| import java.net._ | ||
| import java.util.{Timer, TimerTask} | ||
| import java.util.concurrent.atomic.AtomicInteger | ||
|
|
||
| import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor} | ||
|
|
@@ -69,9 +70,11 @@ private[spark] class ConnectionManager( | |
| } | ||
|
|
||
| private val selector = SelectorProvider.provider.openSelector() | ||
| private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true) | ||
|
|
||
| // default to 30 second timeout waiting for authentication | ||
| private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30) | ||
| private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be documented in doc/configuration.md. When users expect long GC pauses, they may wish to increase timeouts across the board, so it's important that all timeout options are documented. |
||
|
|
||
| private val handleMessageExecutor = new ThreadPoolExecutor( | ||
| conf.getInt("spark.core.connection.handler.threads.min", 20), | ||
|
|
@@ -134,6 +137,7 @@ private[spark] class ConnectionManager( | |
| // to be able to track asynchronous messages | ||
| private val idCount: AtomicInteger = new AtomicInteger(1) | ||
|
|
||
| private var isAckTimeout = false | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logically, this seems like per-message state, so it feels wrong to have it as a ConnectionManager-wide variable. |
||
| private val selectorThread = new Thread("connection-manager-thread") { | ||
| override def run() = ConnectionManager.this.run() | ||
| } | ||
|
|
@@ -652,19 +656,21 @@ private[spark] class ConnectionManager( | |
| } | ||
| } | ||
| if (bufferMessage.hasAckId()) { | ||
| val sentMessageStatus = messageStatuses.synchronized { | ||
| messageStatuses.synchronized { | ||
| messageStatuses.get(bufferMessage.ackId) match { | ||
| case Some(status) => { | ||
| messageStatuses -= bufferMessage.ackId | ||
| status | ||
| status.markDone(Some(message)) | ||
| } | ||
| case None => { | ||
| if (isAckTimeout) { | ||
| logWarning(s"Ack message ${message.id} maybe received after timeout") | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there should be an an This current code looks wrong because it will fall-through and throw an exception if we receive late-arriving messages that we've already timed out on and marked as failures.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The original code (before considering ack timeout) threw Exception when a message which is not referenced is received. So , I decided to throw Exception even if Ack timeout was occurred because we can't distinguish the non-referenced message is caused by ack timeout. On a second throug, fundamentally, is it needed throwing exception here?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before, it was undoubtedly a bug if we received an ack for a message and didn't have a corresponding SentMessageStatus. Now that we have timeouts, we have no way to distinguish between a late-arriving ack for a SentMessageStatus that we've already deleted and a bogus ack sent due to buggy code. As I commented upthread, one option would be to simply convert this into a warning. But another option is to keep it as an error unless we've timed out at least once, in which case we treat it as a warning.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think, the situation receiving non-referenced message is not critical, we should not throw Exception at least so I think log warn is better when receiving non-referenced message even if the message is late arriving ack or not.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's fine, too. Let's just drop the exception for now and remove the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. O.K. I'll remove it. |
||
| throw new Exception("Could not find reference for received ack message " + | ||
| message.id) | ||
| } | ||
| } | ||
| } | ||
| sentMessageStatus.markDone(Some(message)) | ||
| } else { | ||
| var ackMessage : Option[Message] = None | ||
| try { | ||
|
|
@@ -836,9 +842,26 @@ private[spark] class ConnectionManager( | |
| def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message) | ||
| : Future[Message] = { | ||
| val promise = Promise[Message]() | ||
|
|
||
| val timeoutTask = new TimerTask { | ||
| override def run(): Unit = { | ||
| messageStatuses.synchronized { | ||
| isAckTimeout = true | ||
| messageStatuses.remove(message.id).foreach ( s => { | ||
| s.synchronized { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this |
||
| promise.failure( | ||
| new IOException(s"sendMessageReliably failed because ack " + | ||
| "was not received within ${ackTimeout} sec")) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| val status = new MessageStatus(message, connectionManagerId, s => { | ||
| timeoutTask.cancel() | ||
| s.ackMessage match { | ||
| case None => // Indicates a failure where we either never sent or never got ACK'd | ||
| case None => // Indicates a failure where we either never sent or never got ACK'd | ||
| promise.failure(new IOException("sendMessageReliably failed without being ACK'd")) | ||
| case Some(ackMessage) => | ||
| if (ackMessage.hasError) { | ||
|
|
@@ -852,6 +875,8 @@ private[spark] class ConnectionManager( | |
| messageStatuses.synchronized { | ||
| messageStatuses += ((message.id, status)) | ||
| } | ||
|
|
||
| ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000) | ||
| sendMessage(connectionManagerId, message) | ||
| promise.future | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HashedWheelTimer performance is better