Skip to content

Commit 3f33b8d

Browse files
JoshRosennemccarthy
authored andcommitted
[SPARK-6629] cancelJobGroup() may not work for jobs whose job groups are inherited from parent threads
When a job is submitted with a job group and that job group is inherited from a parent thread, there are multiple bugs that may prevent this job from being cancelable via `SparkContext.cancelJobGroup()`: - When filtering jobs based on their job group properties, DAGScheduler calls `get()` instead of `getProperty()`, which does not respect inheritance, so it will skip over jobs whose job group properties were inherited. - `Properties` objects are mutable, but we do not make defensive copies / snapshots, so modifications of the parent thread's job group will cause running jobs' groups to change; this also breaks cancelation. Both of these issues are easy to fix: use `getProperty()` and perform defensive copying. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#5288 from JoshRosen/localProperties-mutability-race and squashes the following commits: 9e29654 [Josh Rosen] Fix style issue 5d90750 [Josh Rosen] Merge remote-tracking branch 'origin/master' into localProperties-mutability-race 3f7b9e8 [Josh Rosen] Add JIRA reference; move clone into DAGScheduler 707e417 [Josh Rosen] Clone local properties to prevent mutations from breaking job cancellation. b376114 [Josh Rosen] Fix bug that prevented jobs with inherited job group properties from being cancelled.
1 parent 6a0e55f commit 3f33b8d

3 files changed

Lines changed: 91 additions & 5 deletions

File tree

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import scala.language.existentials
2828
import scala.language.postfixOps
2929
import scala.util.control.NonFatal
3030

31+
import org.apache.commons.lang3.SerializationUtils
32+
3133
import org.apache.spark._
3234
import org.apache.spark.broadcast.Broadcast
3335
import org.apache.spark.executor.TaskMetrics
@@ -510,7 +512,8 @@ class DAGScheduler(
510512
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
511513
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
512514
eventProcessLoop.post(JobSubmitted(
513-
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
515+
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
516+
SerializationUtils.clone(properties)))
514517
waiter
515518
}
516519

@@ -547,7 +550,8 @@ class DAGScheduler(
547550
val partitions = (0 until rdd.partitions.size).toArray
548551
val jobId = nextJobId.getAndIncrement()
549552
eventProcessLoop.post(JobSubmitted(
550-
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties))
553+
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener,
554+
SerializationUtils.clone(properties)))
551555
listener.awaitResult() // Will throw an exception if the job fails
552556
}
553557

@@ -704,8 +708,11 @@ class DAGScheduler(
704708
private[scheduler] def handleJobGroupCancelled(groupId: String) {
705709
// Cancel all jobs belonging to this job group.
706710
// First finds all active jobs with this group id, and then kill stages for them.
707-
val activeInGroup = activeJobs.filter(activeJob =>
708-
Option(activeJob.properties).exists(_.get(SparkContext.SPARK_JOB_GROUP_ID) == groupId))
711+
val activeInGroup = activeJobs.filter { activeJob =>
712+
Option(activeJob.properties).exists {
713+
_.getProperty(SparkContext.SPARK_JOB_GROUP_ID) == groupId
714+
}
715+
}
709716
val jobIds = activeInGroup.map(_.jobId)
710717
jobIds.foreach(handleJobCancellation(_, "part of cancelled job group %s".format(groupId)))
711718
submitWaitingStages()

core/src/test/scala/org/apache/spark/JobCancellationSuite.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,41 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
141141
assert(jobB.get() === 100)
142142
}
143143

144+
test("inherited job group (SPARK-6629)") {
145+
sc = new SparkContext("local[2]", "test")
146+
147+
// Add a listener to release the semaphore once any tasks are launched.
148+
val sem = new Semaphore(0)
149+
sc.addSparkListener(new SparkListener {
150+
override def onTaskStart(taskStart: SparkListenerTaskStart) {
151+
sem.release()
152+
}
153+
})
154+
155+
sc.setJobGroup("jobA", "this is a job to be cancelled")
156+
@volatile var exception: Exception = null
157+
val jobA = new Thread() {
158+
// The job group should be inherited by this thread
159+
override def run(): Unit = {
160+
exception = intercept[SparkException] {
161+
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
162+
}
163+
}
164+
}
165+
jobA.start()
166+
167+
// Block until both tasks of job A have started and cancel job A.
168+
sem.acquire(2)
169+
sc.cancelJobGroup("jobA")
170+
jobA.join(10000)
171+
assert(!jobA.isAlive)
172+
assert(exception.getMessage contains "cancel")
173+
174+
// Once A is cancelled, job B should finish fairly quickly.
175+
val jobB = sc.parallelize(1 to 100, 2).countAsync()
176+
assert(jobB.get() === 100)
177+
}
178+
144179
test("job group with interruption") {
145180
sc = new SparkContext("local[2]", "test")
146181

core/src/test/scala/org/apache/spark/ThreadingSuite.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.apache.spark
1919

20-
import java.util.concurrent.Semaphore
20+
import java.util.concurrent.{TimeUnit, Semaphore}
2121
import java.util.concurrent.atomic.AtomicBoolean
2222
import java.util.concurrent.atomic.AtomicInteger
2323

24+
import org.apache.spark.scheduler._
2425
import org.scalatest.FunSuite
2526

2627
/**
@@ -189,4 +190,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
189190
assert(sc.getLocalProperty("test") === "parent")
190191
assert(sc.getLocalProperty("Foo") === null)
191192
}
193+
194+
test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
195+
val jobStarted = new Semaphore(0)
196+
val jobEnded = new Semaphore(0)
197+
@volatile var jobResult: JobResult = null
198+
199+
sc = new SparkContext("local", "test")
200+
sc.setJobGroup("originalJobGroupId", "description")
201+
sc.addSparkListener(new SparkListener {
202+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
203+
jobStarted.release()
204+
}
205+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
206+
jobResult = jobEnd.jobResult
207+
jobEnded.release()
208+
}
209+
})
210+
211+
// Create a new thread which will inherit the current thread's properties
212+
val thread = new Thread() {
213+
override def run(): Unit = {
214+
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
215+
// Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
216+
try {
217+
sc.parallelize(1 to 100).foreach { x =>
218+
Thread.sleep(100)
219+
}
220+
} catch {
221+
case s: SparkException => // ignored so that we don't print noise in test logs
222+
}
223+
}
224+
}
225+
thread.start()
226+
// Wait for the job to start, then mutate the original properties, which should have been
227+
// inherited by the running job but hopefully defensively copied or snapshotted:
228+
jobStarted.tryAcquire(10, TimeUnit.SECONDS)
229+
sc.setJobGroup("modifiedJobGroupId", "description")
230+
// Canceling the original job group should cancel the running job. In other words, the
231+
// modification of the properties object should not affect the properties of running jobs
232+
sc.cancelJobGroup("originalJobGroupId")
233+
jobEnded.tryAcquire(10, TimeUnit.SECONDS)
234+
assert(jobResult.isInstanceOf[JobFailed])
235+
}
192236
}

0 commit comments

Comments
 (0)