1717
1818package org .apache .spark .util
1919
20+ import java .util .concurrent .TimeoutException
21+
2022import scala .collection .JavaConversions .mapAsJavaMap
2123import scala .concurrent .Await
2224import scala .concurrent .duration .FiniteDuration
@@ -29,6 +31,48 @@ import org.apache.log4j.{Level, Logger}
2931
3032import org .apache .spark .{Logging , SecurityManager , SparkConf , SparkEnv , SparkException }
3133
34+ /**
35+ * Binds a timeout to a configuration property so that a thrown akka timeout exception can be
36+ * traced back to the originating value. The main constructor takes a generic timeout and
37+ * description while the auxilary constructor uses a specific property defined in the
38+ * configuration.
39+ * @param timeout_duration timeout duration in milliseconds
40+ * @param timeout_description description to be displayed in a timeout exception
41+ */
42+ class ConfiguredTimeout (timeout_duration : FiniteDuration , timeout_description : String = null ) {
43+
44+ /**
45+ * Specialized constructor to lookup the timeout property in the configuration and construct
46+ * a FiniteDuration timeout with the property key as the description
47+ * @param conf configuration properties containing the timeout
48+ * @param timeout_prop property key for the timeout
49+ */
50+ def this (conf : SparkConf , timeout_prop : String ) = {
51+ this (FiniteDuration (conf.getInt(timeout_prop, - 1 ), " millis" ), timeout_prop)
52+ require(timeout_duration.toMillis >= 0 , " invalid property string: " + timeout_prop)
53+ }
54+
55+ val timeout = timeout_duration
56+ val description = timeout_description
57+ }
58+
59+ object ConfiguredTimeout {
60+
61+ /**
62+ * Implicit conversion to allow for a simple FiniteDuration timeout to be used instead of a
63+ * ConfiguredTimeout when the description is not needed.
64+ * @param timeout_duration timeout duration in milliseconds
65+ * @return ConfiguredTimeout object
66+ */
67+ implicit def toConfiguredTimeout (timeout_duration : FiniteDuration ): ConfiguredTimeout =
68+ new ConfiguredTimeout (timeout_duration)
69+
70+ def apply (conf : SparkConf , timeout_prop : String ): ConfiguredTimeout =
71+ new ConfiguredTimeout (conf, timeout_prop)
72+ def apply (timeout_duration : FiniteDuration , timeout_description : String ): ConfiguredTimeout =
73+ new ConfiguredTimeout (timeout_duration, timeout_description)
74+ }
75+
3276/**
3377 * Various utility classes for working with Akka.
3478 */
@@ -147,8 +191,8 @@ private[spark] object AkkaUtils extends Logging {
147191 def askWithReply [T ](
148192 message : Any ,
149193 actor : ActorRef ,
150- timeout : FiniteDuration ): T = {
151- askWithReply[T ](message, actor, maxAttempts = 1 , retryInterval = Int .MaxValue , timeout )
194+ confTimeout : ConfiguredTimeout ): T = {
195+ askWithReply[T ](message, actor, maxAttempts = 1 , retryInterval = Int .MaxValue , confTimeout )
152196 }
153197
154198 /**
@@ -160,7 +204,7 @@ private[spark] object AkkaUtils extends Logging {
160204 actor : ActorRef ,
161205 maxAttempts : Int ,
162206 retryInterval : Long ,
163- timeout : FiniteDuration ): T = {
207+ confTimeout : ConfiguredTimeout ): T = {
164208 // TODO: Consider removing multiple attempts
165209 if (actor == null ) {
166210 throw new SparkException (s " Error sending message [message = $message] " +
@@ -171,14 +215,20 @@ private[spark] object AkkaUtils extends Logging {
171215 while (attempts < maxAttempts) {
172216 attempts += 1
173217 try {
174- val future = actor.ask(message)(timeout)
175- val result = Await .result(future, timeout)
218+ val future = actor.ask(message)(confTimeout. timeout)
219+ val result = Await .result(future, confTimeout. timeout)
176220 if (result == null ) {
177221 throw new SparkException (" Actor returned null" )
178222 }
179223 return result.asInstanceOf [T ]
180224 } catch {
181225 case ie : InterruptedException => throw ie
226+ case te : TimeoutException =>
227+ var msg = te.toString()
228+ if (confTimeout.description != null ) {
229+ msg += " with [" + confTimeout.description + " ]"
230+ }
231+ lastException = new TimeoutException (msg)
182232 case e : Exception =>
183233 lastException = e
184234 logWarning(s " Error sending message [message = $message] in $attempts attempts " , e)
0 commit comments