Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
import java.net._
import java.util.{Timer, TimerTask}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashedWheelTimer performance is better

import java.util.concurrent.atomic.AtomicInteger

import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
Expand Down Expand Up @@ -72,6 +73,7 @@ private[spark] class ConnectionManager(

// default to 30 second timeout waiting for authentication
private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 30)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

60 is better

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Year, maybe right.


private val handleMessageExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.handler.threads.min", 20),
Expand Down Expand Up @@ -134,6 +136,7 @@ private[spark] class ConnectionManager(
// to be able to track asynchronous messages
private val idCount: AtomicInteger = new AtomicInteger(1)

private var isAckTimeout = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, this seems like per-message state, so it feels wrong to have it as a ConnectionManager-wide variable.

private val selectorThread = new Thread("connection-manager-thread") {
override def run() = ConnectionManager.this.run()
}
Expand Down Expand Up @@ -652,19 +655,25 @@ private[spark] class ConnectionManager(
}
}
if (bufferMessage.hasAckId()) {
val sentMessageStatus = messageStatuses.synchronized {
messageStatuses.synchronized {
messageStatuses.get(bufferMessage.ackId) match {
case Some(status) => {
messageStatuses -= bufferMessage.ackId
status
status.markDone(Some(message))
}
case None => {
throw new Exception("Could not find reference for received ack message " +
message.id)
/**
* If isAckTimeout == true, Future returned from sendMessageReliably should fail
* and finally, FetchFailedException is thrown so in this case, we don't need
* to throw Exception here
*/
if (!isAckTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we delete the MessageStatuses of messages that fail with timeouts, I guess we have no way to distinguish between an ACK for a message that we didn't send and an ACK for a timed-out message. I guess that isAckTimeout is used to strike a reasonable compromise in which we'll detect errors if no timeouts occur.

It might be a good idea to logWarning if we receive an ACK after we've timed out; this might help when debugging: if the ack timeout is set too low, warning messages would appear in the logs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

------------------ 原始邮件 ------------------
发件人: "Josh Rosen";[email protected];
发送时间: 2014年8月12日(星期二) 中午12:24
收件人: "apache/spark"[email protected];

主题: Re: [spark] [SPARK-2677] BasicBlockFetchIterator#next can waitforever (#1632)

In core/src/main/scala/org/apache/spark/network/ConnectionManager.scala:

           } >                case None => { > -                throw new Exception("Could not find reference for received ack message " + > -                  message.id) > +                /** > +                 * If isAckTimeout == true, Future returned from sendMessageReliably should fail > +                 * and finally, FetchFailedException is thrown so in this case, we don't need > +                 * to throw Exception here > +                 */ > +                if (!isAckTimeout) {  

Since we delete the MessageStatuses of messages that fail with timeouts, I guess we have no way to distinguish between an ACK for a message that we didn't send and an ACK for a timed-out message. I guess that isAckTimeout is used to strike a reasonable compromise in which we'll detect errors if no timeouts occur.

It might be a good idea to logWarning if we receive an ACK after we've timed out; this might help when debugging: if the ack timeout is set too low, warning messages would appear in the logs.


Reply to this email directly or view it on GitHub.

throw new Exception("Could not find reference for received ack message " +
message.id)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be an an else block here so that we throw an exception only if we haven't hit the ack timeout.

This current code looks wrong because it will fall-through and throw an exception if we receive late-arriving messages that we've already timed out on and marked as failures.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code (before considering ack timeout) threw Exception when a message which is not referenced is received. So , I decided to throw Exception even if Ack timeout was occurred because we can't distinguish the non-referenced message is caused by ack timeout.

On a second throug, fundamentally, is it needed throwing exception here?
When we receive non-referenced message, should we warn simply right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, it was undoubtedly a bug if we received an ack for a message and didn't have a corresponding SentMessageStatus.

Now that we have timeouts, we have no way to distinguish between a late-arriving ack for a SentMessageStatus that we've already deleted and a bogus ack sent due to buggy code. As I commented upthread, one option would be to simply convert this into a warning. But another option is to keep it as an error unless we've timed out at least once, in which case we treat it as a warning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, the situation receiving non-referenced message is not critical, we should not throw Exception at least so I think log warn is better when receiving non-referenced message even if the message is late arriving ack or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fine, too. Let's just drop the exception for now and remove the isAckTimeout variable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O.K. I'll remove it.

}
}
}
sentMessageStatus.markDone(Some(message))
} else {
var ackMessage : Option[Message] = None
try {
Expand Down Expand Up @@ -836,9 +845,14 @@ private[spark] class ConnectionManager(
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
: Future[Message] = {
val promise = Promise[Message]()

val ackTimeoutMonitor = new Timer(s"Ack Timeout Monitor-" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new watchdog thread for each message seems like it could be really expensive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can create a ConnectionManager-wide Timer and submit multiple tasks to it. MessageStatus could store a reference to the TimerTasks and we could cancel those individual tasks when we receive acks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've modified for ConnectionManager-wide timer.

"${connectionManagerId}-MessageId(${message.id})", true)

val status = new MessageStatus(message, connectionManagerId, s => {
ackTimeoutMonitor.cancel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though access to isAckTimeout is guarded by the messageStatuses lock, I think there's still a race condition here. According to the Timer.cancel() docs, cancel() "does not interfere with a currently executing task (if it exists)." So, there's still the potential for the timeout task to race with the message, start running and get blocked waiting to synchronize, and eventually run and throw an exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On closer inspection, I guess the race only sets isAckTimeout because the foreach in messageStatuses.remove(message.id).foreach will guard against the race, so we won't end up failing the promise after calling success on it from the other thread.

s.ackMessage match {
case None => // Indicates a failure where we either never sent or never got ACK'd
case None => // Indicates a failure where we either never sent or never got ACK'd
promise.failure(new IOException("sendMessageReliably failed without being ACK'd"))
case Some(ackMessage) =>
if (ackMessage.hasError) {
Expand All @@ -852,6 +866,23 @@ private[spark] class ConnectionManager(
messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}

val timeoutTask = new TimerTask {
override def run(): Unit = {
messageStatuses.synchronized {
isAckTimeout = true
messageStatuses.remove(message.id).foreach ( s => {
s.synchronized {
promise.failure(
new IOException(s"sendMessageReliably failed because ack " +
"was not received within ${ackTimeout} sec"))
}
})
}
}
}

ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
sendMessage(connectionManagerId, message)
promise.future
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ package org.apache.spark.network

import java.io.IOException
import java.nio._
import java.util.concurrent.TimeoutException

import org.apache.spark.{SecurityManager, SparkConf}
import org.scalatest.FunSuite

import org.mockito.Mockito._
import org.mockito.Matchers._

import scala.concurrent.TimeoutException
import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try
import scala.util.{Failure, Success, Try}

/**
* Test the ConnectionManager with various security settings.
Expand Down Expand Up @@ -255,5 +260,42 @@ class ConnectionManagerSuite extends FunSuite {

}

test("sendMessageRelyably timeout") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spelling: should be sendMessageReliably

val clientConf = new SparkConf
clientConf.set("spark.authenticate", "false")
val ackTimeout = 30
clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")

val clientSecurityManager = new SecurityManager(clientConf)
val manager = new ConnectionManager(0, clientConf, clientSecurityManager)

val serverConf = new SparkConf
serverConf.set("spark.authenticate", "false")
val serverSecurityManager = new SecurityManager(serverConf)
val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
// sleep 60 sec > ack timeout for simulating server slow down or hang up
Thread.sleep(ackTimeout * 3 * 1000)
None
})

val size = 10 * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
val bufferMessage = Message.createBufferMessage(buffer.duplicate)

val future = manager.sendMessageReliably(managerServer.id, bufferMessage)

// Future should throw IOException in 30 sec.
// Otherwise TimeoutExcepton is thrown from Await.result.
// We expect TimeoutException is not thrown.
intercept[IOException] {
Await.result(future, (ackTimeout * 2) second)
}

manager.stop()
managerServer.stop()
}

}