From 9b7b7c164156eb8fa7f105c5777b018e0bf14771 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 7 Aug 2014 23:51:17 +0900 Subject: [PATCH 1/8] (WIP) Modifying ConnectionManager.scala --- .../spark/network/ConnectionManager.scala | 18 +++++- .../network/ConnectionManagerSuite.scala | 62 ++++++++++++++++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 95f96b8463a01..e7eaf7bcae97a 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -22,6 +22,7 @@ import java.nio._ import java.nio.channels._ import java.nio.channels.spi._ import java.net._ +import java.util.{Timer, TimerTask} import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor} @@ -72,6 +73,7 @@ private[spark] class ConnectionManager( // default to 30 second timeout waiting for authentication private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30) + private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 30) private val handleMessageExecutor = new ThreadPoolExecutor( conf.getInt("spark.core.connection.handler.threads.min", 20), @@ -836,9 +838,12 @@ private[spark] class ConnectionManager( def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message) : Future[Message] = { val promise = Promise[Message]() + + val ackTimeoutMonitor = new Timer(s"Ack Timeout Monitor-${connectionManagerId}-MessageId(${message.id})", true) + val status = new MessageStatus(message, connectionManagerId, s => { s.ackMessage match { - case None => // Indicates a failure where we either never sent or never got ACK'd + case None => // Indicates a failure where we either never sent or never got ACK'd promise.failure(new IOException("sendMessageReliably failed without being ACK'd")) case Some(ackMessage) => if (ackMessage.hasError) { @@ -846,13 +851,24 @@ private[spark] class ConnectionManager( new IOException("sendMessageReliably failed with ACK that signalled a remote error")) } else { promise.success(ackMessage) + ackTimeoutMonitor.cancel() } } }) messageStatuses.synchronized { messageStatuses += ((message.id, status)) } + + val timeoutTask = new TimerTask { + override def run(): Unit = { + status.synchronized { + promise.failure(new IOException(s"sendMessageReliably failed because ack was not received within ${ackTimeout} sec")) + } + } + } + sendMessage(connectionManagerId, message) + ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000) promise.future } diff --git a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala index 846537df003df..b3b5ebddfd8a0 100644 --- a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala @@ -19,14 +19,19 @@ package org.apache.spark.network import java.io.IOException import java.nio._ +import java.util.concurrent.TimeoutException import org.apache.spark.{SecurityManager, SparkConf} import org.scalatest.FunSuite +import org.mockito.Mockito._ +import org.mockito.Matchers._ + +import scala.concurrent.TimeoutException import scala.concurrent.{Await, TimeoutException} import scala.concurrent.duration._ import scala.language.postfixOps -import scala.util.Try +import scala.util.{Failure, Success, Try} /** * Test the ConnectionManager with various security settings. @@ -255,5 +260,60 @@ class ConnectionManagerSuite extends FunSuite { } + test("sendMessageRelyably timeout") { + import scala.concurrent.ExecutionContext.Implicits.global + + val clientConf = new SparkConf + clientConf.set("spark.authenticate", "false") + val ackTimeout = 30 + clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}") + + val clientSecurityManager = new SecurityManager(clientConf) + val managerClient = spy(new ConnectionManager(0, clientConf, clientSecurityManager)) + + val serverConf = new SparkConf + serverConf.set("spark.authenticate", "false") + val serverSecurityManager = new SecurityManager(serverConf) + val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager) + managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { + // sleep 60 sec > ack timeout for simulating server slow down or hang up + Thread.sleep(ackTimeout * 2 * 1000) + None + }) + + val size = 10 * 1024 * 1024 + val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte)) + buffer.flip + val bufferMessage = Message.createBufferMessage(buffer.duplicate) + val future = managerClient.sendMessageReliably(managerServer.id, bufferMessage) + /* future.onComplete { + case Success(_) => { + assert(false, "Ack timeout should be occurred but ack was received successfully") + } + case Failure(_) => { + // if timeout occurred, receiveMessage should not be invoked + verify(managerClient, times(0)).acceptConnection(any()) + assert(true) + } + } +*/ + try { + Await.result(future, Duration((ackTimeout * 1.5).toInt, SECONDS)) + assert(false, "Ack timeout should be occurred but ack was received successfully") + } catch { + case e: TimeoutException => { + assert(false, s"Timeout was occurred in ${ackTimeout} but timeout was not occurred") + } + case e: IOException => { + verify(managerClient, times(0)).acceptConnection(any()) + assert(true) + } + } + + managerClient.stop() + managerServer.stop() + + } + } From 0174d6a53973949f410b0e51001675f43db0b12b Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sun, 10 Aug 2014 17:11:14 +0900 Subject: [PATCH 2/8] Modified ConnectionManager.scala to handle the case remote Executor cannot ack --- .../spark/network/ConnectionManager.scala | 32 ++++++++++----- .../network/ConnectionManagerSuite.scala | 40 +++++-------------- 2 files changed, 34 insertions(+), 38 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index e7eaf7bcae97a..00f8db23446b0 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -136,6 +136,7 @@ private[spark] class ConnectionManager( // to be able to track asynchronous messages private val idCount: AtomicInteger = new AtomicInteger(1) + private var isAckTimeout = false private val selectorThread = new Thread("connection-manager-thread") { override def run() = ConnectionManager.this.run() } @@ -654,19 +655,25 @@ private[spark] class ConnectionManager( } } if (bufferMessage.hasAckId()) { - val sentMessageStatus = messageStatuses.synchronized { + messageStatuses.synchronized { messageStatuses.get(bufferMessage.ackId) match { case Some(status) => { messageStatuses -= bufferMessage.ackId - status + status.markDone(Some(message)) } case None => { - throw new Exception("Could not find reference for received ack message " + - message.id) + /** + * If isAckTimeout == true, Future returned from sendMessageReliably should fail + * and finally, FetchFailedException is thrown so in this case, we don't need + * to throw Exception here + */ + if (!isAckTimeout) { + throw new Exception("Could not find reference for received ack message " + + message.id) + } } } } - sentMessageStatus.markDone(Some(message)) } else { var ackMessage : Option[Message] = None try { @@ -842,6 +849,7 @@ private[spark] class ConnectionManager( val ackTimeoutMonitor = new Timer(s"Ack Timeout Monitor-${connectionManagerId}-MessageId(${message.id})", true) val status = new MessageStatus(message, connectionManagerId, s => { + ackTimeoutMonitor.cancel() s.ackMessage match { case None => // Indicates a failure where we either never sent or never got ACK'd promise.failure(new IOException("sendMessageReliably failed without being ACK'd")) @@ -851,7 +859,6 @@ private[spark] class ConnectionManager( new IOException("sendMessageReliably failed with ACK that signalled a remote error")) } else { promise.success(ackMessage) - ackTimeoutMonitor.cancel() } } }) @@ -861,14 +868,21 @@ private[spark] class ConnectionManager( val timeoutTask = new TimerTask { override def run(): Unit = { - status.synchronized { - promise.failure(new IOException(s"sendMessageReliably failed because ack was not received within ${ackTimeout} sec")) + messageStatuses.synchronized { + isAckTimeout = true + messageStatuses.remove(message.id).foreach ( s => { + s.synchronized { + promise.failure( + new IOException(s"sendMessageReliably failed because ack " + + "was not received within ${ackTimeout} sec")) + } + }) } } } - sendMessage(connectionManagerId, message) ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000) + sendMessage(connectionManagerId, message) promise.future } diff --git a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala index b3b5ebddfd8a0..a253da83f9a43 100644 --- a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala @@ -261,15 +261,13 @@ class ConnectionManagerSuite extends FunSuite { } test("sendMessageRelyably timeout") { - import scala.concurrent.ExecutionContext.Implicits.global - val clientConf = new SparkConf clientConf.set("spark.authenticate", "false") val ackTimeout = 30 clientConf.set("spark.core.connection.ack.wait.timeout", s"${ackTimeout}") val clientSecurityManager = new SecurityManager(clientConf) - val managerClient = spy(new ConnectionManager(0, clientConf, clientSecurityManager)) + val manager = new ConnectionManager(0, clientConf, clientSecurityManager) val serverConf = new SparkConf serverConf.set("spark.authenticate", "false") @@ -277,7 +275,7 @@ class ConnectionManagerSuite extends FunSuite { val managerServer = new ConnectionManager(0, serverConf, serverSecurityManager) managerServer.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { // sleep 60 sec > ack timeout for simulating server slow down or hang up - Thread.sleep(ackTimeout * 2 * 1000) + Thread.sleep(ackTimeout * 3 * 1000) None }) @@ -285,34 +283,18 @@ class ConnectionManagerSuite extends FunSuite { val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte)) buffer.flip val bufferMessage = Message.createBufferMessage(buffer.duplicate) - val future = managerClient.sendMessageReliably(managerServer.id, bufferMessage) - /* future.onComplete { - case Success(_) => { - assert(false, "Ack timeout should be occurred but ack was received successfully") - } - case Failure(_) => { - // if timeout occurred, receiveMessage should not be invoked - verify(managerClient, times(0)).acceptConnection(any()) - assert(true) - } - } -*/ - try { - Await.result(future, Duration((ackTimeout * 1.5).toInt, SECONDS)) - assert(false, "Ack timeout should be occurred but ack was received successfully") - } catch { - case e: TimeoutException => { - assert(false, s"Timeout was occurred in ${ackTimeout} but timeout was not occurred") - } - case e: IOException => { - verify(managerClient, times(0)).acceptConnection(any()) - assert(true) - } + + val future = manager.sendMessageReliably(managerServer.id, bufferMessage) + + // Future should throw IOException in 30 sec. + // Otherwise TimeoutExcepton is thrown from Await.result. + // We expect TimeoutException is not thrown. + intercept[IOException] { + Await.result(future, (ackTimeout * 2) second) } - managerClient.stop() + manager.stop() managerServer.stop() - } } From 7cbb8ca1337fe58808ef0b7d15b83723a628bf43 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 11 Aug 2014 10:30:13 +0900 Subject: [PATCH 3/8] Modified to match with scalastyle --- .../scala/org/apache/spark/network/ConnectionManager.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 00f8db23446b0..b9234bd30031d 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -846,7 +846,8 @@ private[spark] class ConnectionManager( : Future[Message] = { val promise = Promise[Message]() - val ackTimeoutMonitor = new Timer(s"Ack Timeout Monitor-${connectionManagerId}-MessageId(${message.id})", true) + val ackTimeoutMonitor = new Timer(s"Ack Timeout Monitor-" + + "${connectionManagerId}-MessageId(${message.id})", true) val status = new MessageStatus(message, connectionManagerId, s => { ackTimeoutMonitor.cancel() From 0dd9ad38f8260237cbc55ec75a0f50b9dfa1ac62 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 15 Aug 2014 10:05:48 +0900 Subject: [PATCH 4/8] Modified typo in ConnectionManagerSuite.scala --- .../scala/org/apache/spark/network/ConnectionManagerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala index a253da83f9a43..e2f4d4c57cdb5 100644 --- a/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/ConnectionManagerSuite.scala @@ -260,7 +260,7 @@ class ConnectionManagerSuite extends FunSuite { } - test("sendMessageRelyably timeout") { + test("sendMessageReliably timeout") { val clientConf = new SparkConf clientConf.set("spark.authenticate", "false") val ackTimeout = 30 From 7ed48be337f469b75a1ba0c85b6817e5beb9f3a6 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Fri, 15 Aug 2014 16:07:56 +0900 Subject: [PATCH 5/8] Modified ConnectionManager to use ackTimeoutMonitor ConnectionManager-wide --- .../spark/network/ConnectionManager.scala | 48 ++++++++----------- 1 file changed, 21 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index b9234bd30031d..06c00a7ebcb61 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -70,10 +70,11 @@ private[spark] class ConnectionManager( } private val selector = SelectorProvider.provider.openSelector() + private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true) // default to 30 second timeout waiting for authentication private val authTimeout = conf.getInt("spark.core.connection.auth.wait.timeout", 30) - private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 30) + private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60) private val handleMessageExecutor = new ThreadPoolExecutor( conf.getInt("spark.core.connection.handler.threads.min", 20), @@ -662,15 +663,11 @@ private[spark] class ConnectionManager( status.markDone(Some(message)) } case None => { - /** - * If isAckTimeout == true, Future returned from sendMessageReliably should fail - * and finally, FetchFailedException is thrown so in this case, we don't need - * to throw Exception here - */ - if (!isAckTimeout) { - throw new Exception("Could not find reference for received ack message " + - message.id) + if (isAckTimeout) { + logWarning(s"Ack message ${message.id} maybe received after timeout") } + throw new Exception("Could not find reference for received ack message " + + message.id) } } } @@ -846,11 +843,23 @@ private[spark] class ConnectionManager( : Future[Message] = { val promise = Promise[Message]() - val ackTimeoutMonitor = new Timer(s"Ack Timeout Monitor-" + - "${connectionManagerId}-MessageId(${message.id})", true) + val timeoutTask = new TimerTask { + override def run(): Unit = { + messageStatuses.synchronized { + isAckTimeout = true + messageStatuses.remove(message.id).foreach ( s => { + s.synchronized { + promise.failure( + new IOException(s"sendMessageReliably failed because ack " + + "was not received within ${ackTimeout} sec")) + } + }) + } + } + } val status = new MessageStatus(message, connectionManagerId, s => { - ackTimeoutMonitor.cancel() + timeoutTask.cancel() s.ackMessage match { case None => // Indicates a failure where we either never sent or never got ACK'd promise.failure(new IOException("sendMessageReliably failed without being ACK'd")) @@ -867,21 +876,6 @@ private[spark] class ConnectionManager( messageStatuses += ((message.id, status)) } - val timeoutTask = new TimerTask { - override def run(): Unit = { - messageStatuses.synchronized { - isAckTimeout = true - messageStatuses.remove(message.id).foreach ( s => { - s.synchronized { - promise.failure( - new IOException(s"sendMessageReliably failed because ack " + - "was not received within ${ackTimeout} sec")) - } - }) - } - } - } - ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000) sendMessage(connectionManagerId, message) promise.future From e85f88bba721f6ef89521a64dd9afd78d86ef964 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 16 Aug 2014 04:24:34 +0900 Subject: [PATCH 6/8] Removed useless synchronized blocks --- .../apache/spark/network/ConnectionManager.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 06c00a7ebcb61..15c6ca8ec9d0e 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -62,10 +62,8 @@ private[spark] class ConnectionManager( var ackMessage: Option[Message] = None def markDone(ackMessage: Option[Message]) { - this.synchronized { - this.ackMessage = ackMessage - completionHandler(this) - } + this.ackMessage = ackMessage + completionHandler(this) } } @@ -848,11 +846,9 @@ private[spark] class ConnectionManager( messageStatuses.synchronized { isAckTimeout = true messageStatuses.remove(message.id).foreach ( s => { - s.synchronized { - promise.failure( - new IOException(s"sendMessageReliably failed because ack " + - "was not received within ${ackTimeout} sec")) - } + promise.failure( + new IOException(s"sendMessageReliably failed because ack " + + "was not received within ${ackTimeout} sec")) }) } } From d3bd2a8b0145d3c5f323d2f5988534d52e93e7a0 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 16 Aug 2014 15:59:11 +0900 Subject: [PATCH 7/8] Modified configuration.md for spark.core.connection.ack.timeout --- docs/configuration.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/configuration.md b/docs/configuration.md index c408c468dcd94..981170d8b49b7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -884,6 +884,15 @@ Apart from these, the following properties are also available, and may be useful out and giving up. + + spark.core.connection.ack.wait.timeout + 60 + + Number of seconds for the connection to wait for ack to occur before timing + out and giving up. To avoid unwilling timeout caused by long pause like GC, + you can set larger value. + + spark.ui.filters None From cddbc7b534e68e6d4b50a1c327de5c85ac977844 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Sat, 16 Aug 2014 16:24:55 +0900 Subject: [PATCH 8/8] Removed Exception throwing when ConnectionManager#handleMessage receives ack for non-referenced message --- .../spark/network/ConnectionManager.scala | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 15c6ca8ec9d0e..37d69a9ec4ce4 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -135,7 +135,6 @@ private[spark] class ConnectionManager( // to be able to track asynchronous messages private val idCount: AtomicInteger = new AtomicInteger(1) - private var isAckTimeout = false private val selectorThread = new Thread("connection-manager-thread") { override def run() = ConnectionManager.this.run() } @@ -661,11 +660,17 @@ private[spark] class ConnectionManager( status.markDone(Some(message)) } case None => { - if (isAckTimeout) { - logWarning(s"Ack message ${message.id} maybe received after timeout") - } - throw new Exception("Could not find reference for received ack message " + - message.id) + /** + * We can fall down on this code because of following 2 cases + * + * (1) Invalid ack sent due to buggy code. + * + * (2) Late-arriving ack for a SendMessageStatus + * To avoid unwilling late-arriving ack + * caused by long pause like GC, you can set + * larger value than default to spark.core.connection.ack.wait.timeout + */ + logWarning(s"Could not find reference for received ack Message ${message.id}") } } } @@ -844,7 +849,6 @@ private[spark] class ConnectionManager( val timeoutTask = new TimerTask { override def run(): Unit = { messageStatuses.synchronized { - isAckTimeout = true messageStatuses.remove(message.id).foreach ( s => { promise.failure( new IOException(s"sendMessageReliably failed because ack " +