Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
import java.net._
import java.util.{Timer, TimerTask}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashedWheelTimer performance is better

import java.util.concurrent.atomic.AtomicInteger

import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
Expand Down Expand Up @@ -61,17 +62,17 @@ private[spark] class ConnectionManager(
var ackMessage: Option[Message] = None

def markDone(ackMessage: Option[Message]) {
this.synchronized {
this.ackMessage = ackMessage
completionHandler(this)
}
this.ackMessage = ackMessage
completionHandler(this)
}
}

private val selector = SelectorProvider.provider.openSelector()
private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)

// default to 30 second timeout waiting for authentication
private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30)
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be documented in doc/configuration.md. When users expect long GC pauses, they may wish to increase timeouts across the board, so it's important that all timeout options are documented.


private val handleMessageExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.handler.threads.min", 20),
Expand Down Expand Up @@ -652,19 +653,27 @@ private[spark] class ConnectionManager(
}
}
if (bufferMessage.hasAckId()) {
val sentMessageStatus = messageStatuses.synchronized {
messageStatuses.synchronized {
messageStatuses.get(bufferMessage.ackId) match {
case Some(status) => {
messageStatuses -= bufferMessage.ackId
status
status.markDone(Some(message))
}
case None => {
throw new Exception("Could not find reference for received ack message " +
message.id)
/**
* We can fall down on this code because of following 2 cases
*
* (1) Invalid ack sent due to buggy code.
*
* (2) Late-arriving ack for a SendMessageStatus
* To avoid unwilling late-arriving ack
* caused by long pause like GC, you can set
* larger value than default to spark.core.connection.ack.wait.timeout
*/
logWarning(s"Could not find reference for received ack Message ${message.id}")
}
}
}
sentMessageStatus.markDone(Some(message))
} else {
var ackMessage : Option[Message] = None
try {
Expand Down Expand Up @@ -836,9 +845,23 @@ private[spark] class ConnectionManager(
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
: Future[Message] = {
val promise = Promise[Message]()

val timeoutTask = new TimerTask {
override def run(): Unit = {
messageStatuses.synchronized {
messageStatuses.remove(message.id).foreach ( s => {
promise.failure(
new IOException(s"sendMessageReliably failed because ack " +
"was not received within ${ackTimeout} sec"))
})
}
}
}

val status = new MessageStatus(message, connectionManagerId, s => {
timeoutTask.cancel()
s.ackMessage match {
case None => // Indicates a failure where we either never sent or never got ACK'd
case None => // Indicates a failure where we either never sent or never got ACK'd
promise.failure(new IOException("sendMessageReliably failed without being ACK'd"))
case Some(ackMessage) =>
if (ackMessage.hasError) {
Expand All @@ -852,6 +875,8 @@ private[spark] class ConnectionManager(
messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}

ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
sendMessage(connectionManagerId, message)
promise.future
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@ package org.apache.spark.network

import java.io.IOException
import java.nio._
import java.util.concurrent.TimeoutException

import org.apache.spark.{SecurityManager, SparkConf}
import org.scalatest.FunSuite

import org.mockito.Mockito._
import org.mockito.Matchers._

import scala.concurrent.TimeoutException
import scala.concurrent.{Await, TimeoutException}
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try
import scala.util.{Failure, Success, Try}

/**
* Test the ConnectionManager with various security settings.
Expand Down Expand Up @@ -255,5 +260,42 @@ class ConnectionManagerSuite extends FunSuite {

}

test("sendMessageReliably timeout") {
val clientConf = new SparkConf
clientConf.set("spark.authenticate", "false")
val ackTimeout = 30
clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}")

val clientSecurityManager = new SecurityManager(clientConf)
val manager = new ConnectionManager(0, clientConf, clientSecurityManager)

val serverConf = new SparkConf
serverConf.set("spark.authenticate", "false")
val serverSecurityManager = new SecurityManager(serverConf)
val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager)
managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
// sleep 60 sec > ack timeout for simulating server slow down or hang up
Thread.sleep(ackTimeout * 3 * 1000)
None
})

val size = 10 * 1024 * 1024
val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
buffer.flip
val bufferMessage = Message.createBufferMessage(buffer.duplicate)

val future = manager.sendMessageReliably(managerServer.id, bufferMessage)

// Future should throw IOException in 30 sec.
// Otherwise TimeoutExcepton is thrown from Await.result.
// We expect TimeoutException is not thrown.
intercept[IOException] {
Await.result(future, (ackTimeout * 2) second)
}

manager.stop()
managerServer.stop()
}

}

9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,15 @@ Apart from these, the following properties are also available, and may be useful
out and giving up.
</td>
</tr>
<tr>
<td><code>spark.core.connection.ack.wait.timeout</code></td>
<td>60</td>
<td>
Number of seconds for the connection to wait for ack to occur before timing
out and giving up. To avoid unwilling timeout caused by long pause like GC,
you can set larger value.
</td>
</tr>
<tr>
<td><code>spark.ui.filters</code></td>
<td>None</td>
Expand Down