Skip to content

Commit fef3a0d

Browse files
committed
Addressed TD's comments
1 parent af6562f commit fef3a0d

2 files changed

Lines changed: 13 additions & 34 deletions

File tree

streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,30 +29,17 @@ import org.apache.spark.streaming.Time
2929
* the streaming scheduler queue
3030
* @param processingStartTime Clock time of when the first job of this batch started processing
3131
* @param processingEndTime Clock time of when the last job of this batch finished processing
32-
* @param failureReasons The failure reasons if there are any jobs in this batch failed. The key is
33-
* `outputOpId` and the value is the failure reason.
3432
*/
3533
@DeveloperApi
3634
case class BatchInfo(
3735
batchTime: Time,
3836
streamIdToInputInfo: Map[Int, StreamInputInfo],
3937
submissionTime: Long,
4038
processingStartTime: Option[Long],
41-
processingEndTime: Option[Long],
42-
private[streaming] val failureReasons: Map[Int, String]) {
39+
processingEndTime: Option[Long]
40+
) {
4341

44-
/**
45-
* Create `BatchInfo`. This is for binary compatibility.
46-
*/
47-
def this(
48-
batchTime: Time,
49-
streamIdToInputInfo: Map[Int, StreamInputInfo],
50-
submissionTime: Long,
51-
processingStartTime: Option[Long],
52-
processingEndTime: Option[Long]) {
53-
this(batchTime, streamIdToInputInfo, submissionTime, processingStartTime, processingEndTime,
54-
Map.empty)
55-
}
42+
private var _failureReasons: Map[Int, String] = Map.empty
5643

5744
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
5845
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
@@ -82,21 +69,12 @@ case class BatchInfo(
8269
* The number of recorders received by the receivers in this batch.
8370
*/
8471
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
85-
}
8672

87-
@DeveloperApi
88-
object BatchInfo {
89-
90-
/**
91-
* Create `BatchInfo`. This is for binary compatibility.
92-
*/
93-
def apply(
94-
batchTime: Time,
95-
streamIdToInputInfo: Map[Int, StreamInputInfo],
96-
submissionTime: Long,
97-
processingStartTime: Option[Long],
98-
processingEndTime: Option[Long]): BatchInfo = {
99-
BatchInfo(batchTime, streamIdToInputInfo, submissionTime, processingStartTime,
100-
processingEndTime, Map.empty)
73+
/** Set the failure reasons corresponding to every output ops in the batch */
74+
private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = {
75+
_failureReasons = reasons
10176
}
77+
78+
/** Failure reasons corresponding to every output ops in the batch */
79+
private[streaming] def failureReasons = _failureReasons
10280
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,14 @@ case class JobSet(
7373
Map.empty
7474
}
7575
}
76-
BatchInfo(
76+
val binfo = new BatchInfo(
7777
time,
7878
streamIdToInputInfo,
7979
submissionTime,
8080
if (processingStartTime >= 0) Some(processingStartTime) else None,
81-
if (processingEndTime >= 0) Some(processingEndTime) else None,
82-
failureReasons
81+
if (processingEndTime >= 0) Some(processingEndTime) else None
8382
)
83+
binfo.setFailureReason(failureReasons)
84+
binfo
8485
}
8586
}

0 commit comments

Comments
 (0)