Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ case class BatchInfo(
processingEndTime: Option[Long]
) {

private var _failureReasons: Map[Int, String] = Map.empty

@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)

Expand Down Expand Up @@ -67,4 +69,12 @@ case class BatchInfo(
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum

/** Set the failure reasons corresponding to every output ops in the batch */
private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = {
_failureReasons = reasons
}

/** Failure reasons corresponding to every output ops in the batch */
private[streaming] def failureReasons = _failureReasons
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}

private def handleJobCompletion(job: Job) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Success(_) =>
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
case Failure(e) =>
reportError("Error running job " + job, e)
case _ =>
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.spark.streaming.scheduler

import scala.collection.mutable.HashSet
import scala.util.Failure

import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils

/** Class representing a set of Jobs
* belong to the same batch.
Expand Down Expand Up @@ -62,12 +64,23 @@ case class JobSet(
}

def toBatchInfo: BatchInfo = {
new BatchInfo(
val failureReasons: Map[Int, String] = {
if (hasCompleted) {
jobs.filter(_.result.isFailure).map { job =>
(job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception))
}.toMap
} else {
Map.empty
}
}
val binfo = new BatchInfo(
time,
streamIdToInputInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
if (processingStartTime >= 0) Some(processingStartTime) else None,
if (processingEndTime >= 0) Some(processingEndTime) else None
)
binfo.setFailureReason(failureReasons)
binfo
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,69 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}

test("onBatchCompleted with successful batch") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)

val failureReasons = startStreamingContextAndCollectFailureReasons(ssc)
assert(failureReasons != null && failureReasons.isEmpty,
"A successful batch should not set errorMessage")
}

test("onBatchCompleted with failed batch and one failed job") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD { _ =>
throw new RuntimeException("This is a failed job")
}

// Check if failureReasons contains the correct error message
val failureReasons = startStreamingContextAndCollectFailureReasons(ssc, isFailed = true)
assert(failureReasons != null)
assert(failureReasons.size === 1)
assert(failureReasons.contains(0))
assert(failureReasons(0).contains("This is a failed job"))
}

test("onBatchCompleted with failed batch and multiple failed jobs") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD { _ =>
throw new RuntimeException("This is a failed job")
}
inputStream.foreachRDD { _ =>
throw new RuntimeException("This is another failed job")
}

// Check if failureReasons contains the correct error messages
val failureReasons =
startStreamingContextAndCollectFailureReasons(ssc, isFailed = true)
assert(failureReasons != null)
assert(failureReasons.size === 2)
assert(failureReasons.contains(0))
assert(failureReasons.contains(1))
assert(failureReasons(0).contains("This is a failed job"))
assert(failureReasons(1).contains("This is another failed job"))
}

private def startStreamingContextAndCollectFailureReasons(
_ssc: StreamingContext, isFailed: Boolean = false): Map[Int, String] = {
val failureReasonsCollector = new FailureReasonsCollector()
_ssc.addStreamingListener(failureReasonsCollector)
val batchCounter = new BatchCounter(_ssc)
_ssc.start()
// Make sure running at least one batch
batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)
if (isFailed) {
intercept[RuntimeException] {
_ssc.awaitTerminationOrTimeout(10000)
}
}
_ssc.stop()
failureReasonsCollector.failureReasons
}

/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
for (i <- 1 until seq.size) {
Expand Down Expand Up @@ -205,3 +268,16 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
}
def onStop() { }
}

/**
* A StreamingListener that saves the latest `failureReasons` in `BatchInfo` to the `failureReasons`
* field.
*/
class FailureReasonsCollector extends StreamingListener {

@volatile var failureReasons: Map[Int, String] = null

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
failureReasons = batchCompleted.batchInfo.failureReasons
}
}