Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ private object FaultToleranceTest extends App with Logging {
val f = Future {
try {
val res = sc.parallelize(0 until 10).collect()
assertTrue(res.toList == (0 until 10))
assertTrue(res.toList == (0 until 10).toList)
true
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ private[spark] class TaskSetManager(
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for (rack <- sched.getRackForHost(host)) {
for (index <- speculatableTasks if canRunOnHost(index)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but I'd actually refactor this as a single for:

for {
  rack <- sched.getRackForHost(host)
  index <- speculatableTasks if canRunOnHost(index)
} {
  val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost)
  if (racks.contains(rack)) {
    speculatableTasks -= index
    return Some((index, TaskLocality.RACK_LOCAL))
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal but I think that can happen outside of this patch since it's just cleanup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, it's style, not substance

val racks = tasks(index).preferredLocations.map(_.host).map(sched.getRackForHost)
val racks = tasks(index).preferredLocations.map(_.host).flatMap(sched.getRackForHost)
if (racks.contains(rack)) {
speculatableTasks -= index
return Some((index, TaskLocality.RACK_LOCAL))
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ trait RDDCheckpointTester { self: SparkFunSuite =>
}

// Test whether dependencies have been changed from its earlier parent RDD
assert(operatedRDD.dependencies.head.rdd != parentRDD)
assert(operatedRDD.dependencies.head != parentRDD)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the original misnomer that encouraged this bad comparison: parentRDD is not an RDD, but rather a Dependency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


// Test whether the partitions have been changed from its earlier partitions
assert(operatedRDD.partitions.toList != partitionsBeforeCheckpoint.toList)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva
val hashP2 = new HashPartitioner(2)
assert(rangeP2 === rangeP2)
assert(hashP2 === hashP2)
assert(hashP2 != rangeP2)
assert(rangeP2 != hashP2)
assert(hashP2 !== rangeP2)
assert(rangeP2 !== hashP2)
}

test("partitioner preservation") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,11 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
JInt(stageId) <- stage \ "stageId"
JInt(attemptId) <- stage \ "attemptId"
} {
val exp = if (attemptId == 0 && stageId == 1) StageStatus.FAILED else StageStatus.COMPLETE
val exp = if (attemptId.toInt == 0 && stageId.toInt == 1) {
StageStatus.FAILED
} else {
StageStatus.COMPLETE
}
status should be (exp.name())
}

Expand Down