Skip to content
Closed
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
97523e0
[SPARK-6980] Akka ask timeout description refactored to RPC layer
BryanCutler May 15, 2015
78a2c0a
[SPARK-6980] Using RpcTimeout.awaitResult for future in AppClient now
BryanCutler May 16, 2015
5b59a44
[SPARK-6980] Added some RpcTimeout unit tests
BryanCutler May 16, 2015
49f9f04
[SPARK-6980] Minor cleanup and scala style fix
BryanCutler May 16, 2015
23d2f26
[SPARK-6980] Fixed await result not being handled by RpcTimeout
BryanCutler May 18, 2015
a294569
[SPARK-6980] Added creation of RpcTimeout with Seq of property keys
BryanCutler May 19, 2015
f74064d
Retrieving properties from property list using iterator and while loo…
May 21, 2015
0ee5642
Changing the loop condition to halt at the first match in the propert…
May 21, 2015
4be3a8d
Modifying loop condition to find property match
May 24, 2015
b7fb99f
Merge pull request #2 from hardmettle/configTimeoutUpdates_6980
BryanCutler May 24, 2015
c07d05c
Merge branch 'master' into configTimeout-6980-tmp
BryanCutler Jun 3, 2015
235919b
[SPARK-6980] Resolved conflicts after master merge
BryanCutler Jun 3, 2015
2f94095
[SPARK-6980] Added addMessageIfTimeout for when a Future is completed…
BryanCutler Jun 4, 2015
1607a5f
[SPARK-6980] Changed addMessageIfTimeout to PartialFunction, cleanup …
BryanCutler Jun 8, 2015
4351c48
[SPARK-6980] Added UT for addMessageIfTimeout, cleaned up UTs
BryanCutler Jun 10, 2015
7774d56
[SPARK-6980] Cleaned up UT imports
BryanCutler Jun 11, 2015
995d196
[SPARK-6980] Cleaned up import ordering, comments, spacing from PR fe…
BryanCutler Jun 11, 2015
d3754d1
[SPARK-6980] Added akkaConf to prevent dead letter logging
BryanCutler Jun 11, 2015
08f5afc
[SPARK-6980] Added UT for constructing RpcTimeout with default value
BryanCutler Jun 11, 2015
1b9beab
[SPARK-6980] Cleaned up import ordering
BryanCutler Jun 12, 2015
2206b4d
[SPARK-6980] Added unit test for ask then immediat awaitReply
BryanCutler Jun 12, 2015
1517721
[SPARK-6980] RpcTimeout object scope should be private[spark]
BryanCutler Jun 15, 2015
1394de6
[SPARK-6980] Moved MessagePrefix to createRpcTimeoutException directly
BryanCutler Jun 15, 2015
c6cfd33
[SPARK-6980] Changed UT ask message timeout to explicitly intercept a…
BryanCutler Jun 23, 2015
b05d449
[SPARK-6980] Changed constructor to use val duration instead of gette…
BryanCutler Jun 23, 2015
fa6ed82
[SPARK-6980] Had to increase timeout on positive test case because a …
BryanCutler Jun 23, 2015
fadaf6f
[SPARK-6980] Put back in deprecated RpcUtils askTimeout and lookupTim…
BryanCutler Jun 24, 2015
218aa50
[SPARK-6980] Corrected issues from feedback
BryanCutler Jun 24, 2015
039afed
[SPARK-6980] Corrected import organization
BryanCutler Jun 24, 2015
be11c4e
Merge branch 'master' into configTimeout-6980
BryanCutler Jun 24, 2015
7636189
[SPARK-6980] Fixed call to askWithReply in DAGScheduler to use RpcTim…
BryanCutler Jun 26, 2015
3a168c7
[SPARK-6980] Rewrote Akka RpcTimeout UTs in RpcEnvSuite
BryanCutler Jun 26, 2015
3d8b1ff
[SPARK-6980] Cleaned up imports in AkkaRpcEnvSuite
BryanCutler Jun 26, 2015
287059a
[SPARK-6980] Removed extra import in AkkaRpcEnvSuite
BryanCutler Jun 26, 2015
7f4d78e
[SPARK-6980] Fixed scala style checks
BryanCutler Jun 26, 2015
6a1c50d
[SPARK-6980] Minor cleanup of test case
BryanCutler Jun 27, 2015
4e89c75
[SPARK-6980] Missed one usage of deprecated RpcUtils.askTimeout in Ya…
BryanCutler Jun 30, 2015
dbd5f73
[SPARK-6980] Changed RpcUtils askRpcTimeout and lookupRpcTimeout scop…
BryanCutler Jul 1, 2015
7bb70f1
Merge branch 'master' into configTimeout-6980
BryanCutler Jul 1, 2015
06afa53
[SPARK-6980] RpcTimeout class extends Serializable, was causing error…
BryanCutler Jul 2, 2015
46c8d48
[SPARK-6980] Changed RpcEnvSuite test to never reply instead of just …
BryanCutler Jul 2, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
private val lostMasters = new HashSet[Address]
private var activeMasterActor: ActorSelection = null

val timeout = RpcUtils.askTimeout(conf)
val timeout = RpcUtils.askRpcTimeout(conf)

override def preStart(): Unit = {
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
Expand Down Expand Up @@ -102,9 +102,9 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
println("... waiting before polling master for driver state")
Thread.sleep(5000)
println("... polling master for driver state")
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout)
val statusFuture = (activeMasterActor ? RequestDriverStatus(driverId))(timeout.duration)
.mapTo[DriverStatusResponse]
val statusResponse = Await.result(statusFuture, timeout)
val statusResponse = timeout.awaitResult(statusFuture)
statusResponse.found match {
case false =>
println(s"ERROR: Cluster master did not recognize $driverId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,9 @@ private[spark] class AppClient(
def stop() {
if (actor != null) {
try {
val timeout = RpcUtils.askTimeout(conf)
val future = actor.ask(StopAppClient)(timeout)
Await.result(future, timeout)
val timeout = RpcUtils.askRpcTimeout(conf)
val future = actor.ask(StopAppClient)(timeout.duration)
timeout.awaitResult(future)
} catch {
case e: TimeoutException =>
logInfo("Stop request to Master timed out; it may already be shut down.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.text.SimpleDateFormat
import java.util.Date

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
Expand Down Expand Up @@ -939,9 +938,9 @@ private[deploy] object Master extends Logging {
securityManager = securityMgr)
val actor = actorSystem.actorOf(
Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
val timeout = RpcUtils.askTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout)
val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
val timeout = RpcUtils.askRpcTimeout(conf)
val portsRequest = actor.ask(BoundPortsRequest)(timeout.duration)
val portsResponse = timeout.awaitResult(portsRequest).asInstanceOf[BoundPortsResponse]
(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import scala.concurrent.Await
import scala.xml.Node

import akka.pattern.ask
Expand All @@ -38,8 +37,8 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
/** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
val stateFuture = (master ? RequestMasterState)(timeout.duration).mapTo[MasterStateResponse]
val state = timeout.awaitResult(stateFuture)
val app = state.activeApps.find(_.id == appId).getOrElse({
state.completedApps.find(_.id == appId).getOrElse(null)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import scala.concurrent.Await
import scala.xml.Node

import akka.pattern.ask
Expand All @@ -36,8 +35,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
private val timeout = parent.timeout

def getMasterState: MasterStateResponse = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
Await.result(stateFuture, timeout)
val stateFuture = (master ? RequestMasterState)(timeout.duration).mapTo[MasterStateResponse]
timeout.awaitResult(stateFuture)
}

override def renderJson(request: HttpServletRequest): JValue = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
with UIRoot {

val masterActorRef = master.self
val timeout = RpcUtils.askTimeout(master.conf)
val timeout = RpcUtils.askRpcTimeout(master.conf)
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)

val masterPage = new MasterPage(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ private[rest] class StandaloneKillRequestServlet(masterActor: ActorRef, conf: Sp
extends KillRequestServlet {

protected def handleKill(submissionId: String): KillSubmissionResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
val askTimeout = RpcUtils.askRpcTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.KillDriverResponse](
DeployMessages.RequestKillDriver(submissionId), masterActor, askTimeout)
val k = new KillSubmissionResponse
Expand All @@ -90,7 +90,7 @@ private[rest] class StandaloneStatusRequestServlet(masterActor: ActorRef, conf:
extends StatusRequestServlet {

protected def handleStatus(submissionId: String): SubmissionStatusResponse = {
val askTimeout = RpcUtils.askTimeout(conf)
val askTimeout = RpcUtils.askRpcTimeout(conf)
val response = AkkaUtils.askWithReply[DeployMessages.DriverStatusResponse](
DeployMessages.RequestDriverStatus(submissionId), masterActor, askTimeout)
val message = response.exception.map { s"Exception from the cluster:\n" + formatException(_) }
Expand Down Expand Up @@ -175,7 +175,7 @@ private[rest] class StandaloneSubmitRequestServlet(
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val askTimeout = RpcUtils.askTimeout(conf)
val askTimeout = RpcUtils.askRpcTimeout(conf)
val driverDescription = buildDriverDescription(submitRequest)
val response = AkkaUtils.askWithReply[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription), masterActor, askTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.deploy.worker.ui

import scala.concurrent.Await
import scala.xml.Node

import akka.pattern.ask
Expand All @@ -36,14 +35,16 @@ private[ui] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
private val timeout = parent.timeout

override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)
val stateFuture = (workerActor ? RequestWorkerState)(timeout.duration).
mapTo[WorkerStateResponse]
val workerState = timeout.awaitResult(stateFuture)
JsonProtocol.writeWorkerState(workerState)
}

def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)
val stateFuture = (workerActor ? RequestWorkerState)(timeout.duration).
mapTo[WorkerStateResponse]
val workerState = timeout.awaitResult(stateFuture)

val executorHeaders = Seq("ExecutorID", "Cores", "State", "Memory", "Job Details", "Logs")
val runningExecutors = workerState.executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class WorkerWebUI(
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
with Logging {

private[ui] val timeout = RpcUtils.askTimeout(worker.conf)
private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)

initialize()

Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/rpc/RpcEndpointRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.rpc

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import scala.reflect.ClassTag

import org.apache.spark.util.RpcUtils
Expand All @@ -32,7 +31,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)

private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askTimeout(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

/**
* return the address for the [[RpcEndpointRef]]
Expand All @@ -52,7 +51,7 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any, timeout: FiniteDuration): Future[T]
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
Expand Down Expand Up @@ -91,15 +90,15 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askWithRetry[T: ClassTag](message: Any, timeout: FiniteDuration): T = {
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
var attempts = 0
var lastException: Exception = null
while (attempts < maxRetries) {
attempts += 1
try {
val future = ask[T](message, timeout)
val result = Await.result(future, timeout)
val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("Actor returned null")
}
Expand All @@ -110,10 +109,14 @@ private[spark] abstract class RpcEndpointRef(@transient conf: SparkConf)
lastException = e
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
}
Thread.sleep(retryWaitMs)

if (attempts < maxRetries) {
Thread.sleep(retryWaitMs)
}
}

throw new SparkException(
s"Error sending message [message = $message]", lastException)
}

}
111 changes: 108 additions & 3 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.rpc

import java.net.URI
import java.util.concurrent.TimeoutException

import scala.concurrent.{Await, Future}
import scala.concurrent.{Awaitable, Await, Future}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you don't need to import FiniteDuration explicitly since you're importing duration._. Also you are supposed to order direct class imports before package imports (not just alphabetically), so it should be:

import scala.concurrent.{Awaitable, Await, Future}
import scala.concurrent.duration._

the intellij import organizer will get this wrong, but aaron davidson wrote a plugin which does it right -- there are instructions for using it here: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, using the plugin now

import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.{SecurityManager, SparkConf}
Expand Down Expand Up @@ -66,7 +68,7 @@ private[spark] object RpcEnv {
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {

private[spark] val defaultLookupTimeout = RpcUtils.lookupTimeout(conf)
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)

/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
Expand Down Expand Up @@ -94,7 +96,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
*/
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
Await.result(asyncSetupEndpointRefByURI(uri), defaultLookupTimeout)
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
}

/**
Expand Down Expand Up @@ -182,3 +184,106 @@ private[spark] object RpcAddress {
RpcAddress(host, port)
}
}


/**
* An exception thrown if RpcTimeout modifies a [[TimeoutException]].
*/
private[rpc] class RpcTimeoutException(message: String, cause: TimeoutException)
extends TimeoutException(message) { initCause(cause) }


/**
* Associates a timeout with a description so that a when a TimeoutException occurs, additional
* context about the timeout can be amended to the exception message.
* @param duration timeout duration in seconds
* @param timeoutProp the configuration property that controls this timeout
*/
private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String) {

/** Amends the standard message of TimeoutException to include the description */
private def createRpcTimeoutException(te: TimeoutException): RpcTimeoutException = {
new RpcTimeoutException(te.getMessage() + ". This timeout is controlled by " + timeoutProp, te)
}

/**
* PartialFunction to match a TimeoutException and add the timeout description to the message
*
* @note This can be used in the recover callback of a Future to add to a TimeoutException
* Example:
* val timeout = new RpcTimeout(5 millis, "short timeout")
* Future(throw new TimeoutException).recover(timeout.addMessageIfTimeout)
*/
def addMessageIfTimeout[T]: PartialFunction[Throwable, T] = {
// The exception has already been converted to a RpcTimeoutException so just raise it
case rte: RpcTimeoutException => throw rte
// Any other TimeoutException get converted to a RpcTimeoutException with modified message
case te: TimeoutException => throw createRpcTimeoutException(te)
}

/**
* Wait for the completed result and return it. If the result is not available within this
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
* @param awaitable the `Awaitable` to be awaited
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
* is still not ready
*/
def awaitResult[T](awaitable: Awaitable[T]): T = {
try {
Await.result(awaitable, duration)
} catch addMessageIfTimeout
}
}


private[spark] object RpcTimeout {

/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @throws NoSuchElementException if property is not set
*/
def apply(conf: SparkConf, timeoutProp: String): RpcTimeout = {
val timeout = { conf.getTimeAsSeconds(timeoutProp) seconds }
new RpcTimeout(timeout, timeoutProp)
}

/**
* Lookup the timeout property in the configuration and create
* a RpcTimeout with the property key in the description.
* Uses the given default value if property is not set
* @param conf configuration properties containing the timeout
* @param timeoutProp property key for the timeout in seconds
* @param defaultValue default timeout value in seconds if property not found
*/
def apply(conf: SparkConf, timeoutProp: String, defaultValue: String): RpcTimeout = {
val timeout = { conf.getTimeAsSeconds(timeoutProp, defaultValue) seconds }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be conf.getTimeAsMs instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned above, I don't think so -- lets stick w/ the current behavior of using conf.getTimeAsSeconds for now.

new RpcTimeout(timeout, timeoutProp)
}

/**
* Lookup prioritized list of timeout properties in the configuration
* and create a RpcTimeout with the first set property key in the
* description.
* Uses the given default value if property is not set
* @param conf configuration properties containing the timeout
* @param timeoutPropList prioritized list of property keys for the timeout in seconds
* @param defaultValue default timeout value in seconds if no properties found
*/
def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating RpcTimeout from a prioritized list of property keys, from @squito previous request

require(timeoutPropList.nonEmpty)

// Find the first set property or use the default value with the first property
val itr = timeoutPropList.iterator
var foundProp: Option[(String, String)] = None
while (itr.hasNext && foundProp.isEmpty){
val propKey = itr.next()
conf.getOption(propKey).foreach { prop => foundProp = Some(propKey, prop) }
}
val finalProp = foundProp.getOrElse(timeoutPropList.head, defaultValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above lines can be replaced by a single line: val finalProp = timeoutPropList.flatMap(key => conf.getOption(key).map(value => key -> value)).headOption.getOrElse(timeoutPropList.head -> defaultValue).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. Looks it's long too.

val timeout = { Utils.timeStringAsSeconds(finalProp._2) seconds }
new RpcTimeout(timeout, finalProp._1)
}
}
Loading