@@ -20,11 +20,14 @@ package org.apache.spark.network.nio
2020import java .net ._
2121import java .nio ._
2222import java .nio .channels ._
23+ import java .util .concurrent .ConcurrentLinkedQueue
2324import java .util .LinkedList
2425
2526import org .apache .spark ._
2627
28+ import scala .collection .JavaConversions ._
2729import scala .collection .mutable .{ArrayBuffer , HashMap }
30+ import scala .util .control .NonFatal
2831
2932private [nio]
3033abstract class Connection (val channel : SocketChannel , val selector : Selector ,
@@ -51,7 +54,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
5154
5255 @ volatile private var closed = false
5356 var onCloseCallback : Connection => Unit = null
54- var onExceptionCallback : (Connection , Exception ) => Unit = null
57+ val onExceptionCallbacks = new ConcurrentLinkedQueue [ (Connection , Throwable ) => Unit ]
5558 var onKeyInterestChangeCallback : (Connection , Int ) => Unit = null
5659
5760 val remoteAddress = getRemoteAddress()
@@ -130,20 +133,24 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
130133 onCloseCallback = callback
131134 }
132135
133- def onException (callback : (Connection , Exception ) => Unit ) {
134- onExceptionCallback = callback
136+ def onException (callback : (Connection , Throwable ) => Unit ) {
137+ onExceptionCallbacks.add( callback)
135138 }
136139
137140 def onKeyInterestChange (callback : (Connection , Int ) => Unit ) {
138141 onKeyInterestChangeCallback = callback
139142 }
140143
141- def callOnExceptionCallback (e : Exception ) {
142- if (onExceptionCallback != null ) {
143- onExceptionCallback(this , e)
144- } else {
145- logError(" Error in connection to " + getRemoteConnectionManagerId() +
146- " and OnExceptionCallback not registered" , e)
144+ def callOnExceptionCallbacks (e : Throwable ) {
145+ onExceptionCallbacks foreach {
146+ callback =>
147+ try {
148+ callback(this , e)
149+ } catch {
150+ case NonFatal (e) => {
151+ logWarning(" Ignored error in onExceptionCallback" , e)
152+ }
153+ }
147154 }
148155 }
149156
@@ -323,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
323330 } catch {
324331 case e : Exception => {
325332 logError(" Error connecting to " + address, e)
326- callOnExceptionCallback (e)
333+ callOnExceptionCallbacks (e)
327334 }
328335 }
329336 }
@@ -348,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
348355 } catch {
349356 case e : Exception => {
350357 logWarning(" Error finishing connection to " + address, e)
351- callOnExceptionCallback (e)
358+ callOnExceptionCallbacks (e)
352359 }
353360 }
354361 true
@@ -393,7 +400,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
393400 } catch {
394401 case e : Exception => {
395402 logWarning(" Error writing in connection to " + getRemoteConnectionManagerId(), e)
396- callOnExceptionCallback (e)
403+ callOnExceptionCallbacks (e)
397404 close()
398405 return false
399406 }
@@ -420,7 +427,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
420427 case e : Exception =>
421428 logError(" Exception while reading SendingConnection to " + getRemoteConnectionManagerId(),
422429 e)
423- callOnExceptionCallback (e)
430+ callOnExceptionCallbacks (e)
424431 close()
425432 }
426433
@@ -577,7 +584,7 @@ private[spark] class ReceivingConnection(
577584 } catch {
578585 case e : Exception => {
579586 logWarning(" Error reading from connection to " + getRemoteConnectionManagerId(), e)
580- callOnExceptionCallback (e)
587+ callOnExceptionCallbacks (e)
581588 close()
582589 return false
583590 }
0 commit comments