Skip to content

Commit 4117b8f

Browse files
committed
Modified ConnectionManager to be alble to handle error during processing message
1 parent 717c9c3 commit 4117b8f

3 files changed

Lines changed: 33 additions & 32 deletions

File tree

core/src/main/scala/org/apache/spark/network/ConnectionManager.scala

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -657,26 +657,37 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf,
657657
sentMessageStatus.markDone()
658658
}
659659
} else {
660-
val ackMessage = if (onReceiveCallback != null) {
661-
logDebug("Calling back")
662-
onReceiveCallback(bufferMessage, connectionManagerId)
663-
} else {
664-
logDebug("Not calling back as callback is null")
665-
None
666-
}
660+
var ackMessage : Option[Message] = None
661+
try {
662+
ackMessage = if (onReceiveCallback != null) {
663+
logDebug("Calling back")
664+
onReceiveCallback(bufferMessage, connectionManagerId)
665+
} else {
666+
logDebug("Not calling back as callback is null")
667+
None
668+
}
667669

668-
if (ackMessage.isDefined) {
669-
if (!ackMessage.get.isInstanceOf[BufferMessage]) {
670-
logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type "
671-
+ ackMessage.get.getClass)
672-
} else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
673-
logDebug("Response to " + bufferMessage + " does not have ack id set")
674-
ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id
670+
if (ackMessage.isDefined) {
671+
if (!ackMessage.get.isInstanceOf[BufferMessage]) {
672+
logDebug("Response to " + bufferMessage + " is not a buffer message, it is of type "
673+
+ ackMessage.get.getClass)
674+
} else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) {
675+
logDebug("Response to " + bufferMessage + " does not have ack id set")
676+
ackMessage.get.asInstanceOf[BufferMessage].ackId = bufferMessage.id
677+
}
675678
}
679+
} catch {
680+
case e: Exception => {
681+
logError(s"Exception was thrown during processing message", e)
682+
val m = Message.createBufferMessage(bufferMessage.id)
683+
m.hasError = true
684+
ackMessage = Some(m)
685+
}
686+
} finally {
687+
sendMessage(connectionManagerId, ackMessage.getOrElse {
688+
Message.createBufferMessage(bufferMessage.id)
689+
})
676690
}
677-
sendMessage(connectionManagerId, ackMessage.getOrElse {
678-
Message.createBufferMessage(bufferMessage.id)
679-
})
680691
}
681692
}
682693
case _ => throw new Exception("Unknown type message received")

core/src/main/scala/org/apache/spark/network/MessageChunkHeader.scala

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,7 @@ private[spark] class MessageChunkHeader(
4242
putInt(totalSize).
4343
putInt(chunkSize).
4444
putInt(other).
45-
put{
46-
if (hasError) {
47-
1.asInstanceOf[Byte]
48-
} else {
49-
0.asInstanceOf[Byte]
50-
}
51-
}.
45+
put(if (hasError) 1.asInstanceOf[Byte] else 0.asInstanceOf[Byte]).
5246
putInt(securityNeg).
5347
putInt(ip.size).
5448
put(ip).
@@ -75,13 +69,7 @@ private[spark] object MessageChunkHeader {
7569
val totalSize = buffer.getInt()
7670
val chunkSize = buffer.getInt()
7771
val other = buffer.getInt()
78-
val hasError = {
79-
if (buffer.get() == 0) {
80-
false
81-
} else {
82-
true
83-
}
84-
}
72+
val hasError = buffer.get() != 0
8573
val securityNeg = buffer.getInt()
8674
val ipSize = buffer.getInt()
8775
val ipBytes = new Array[Byte](ipSize)

core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
5454
}
5555
case otherMessage: Any => {
5656
logError("Unknown type message received: " + otherMessage)
57-
None
57+
val errorMessage = Message.createBufferMessage(msg.id)
58+
errorMessage.hasError = true
59+
Some(errorMessage)
5860
}
5961
}
6062
}

0 commit comments

Comments
 (0)