Skip to content

Commit 810efd8

Browse files
mengxrCodingCat
authored andcommitted
akka solution
1 parent 07d72fe commit 810efd8

3 files changed

Lines changed: 53 additions & 23 deletions

File tree

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -274,10 +274,10 @@ class SparkContext(config: SparkConf) extends Logging {
274274

275275
// Create and start the scheduler
276276
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
277-
taskScheduler.start()
278-
279277
@volatile private[spark] var dagScheduler = new DAGScheduler(this)
280-
dagScheduler.start()
278+
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
279+
// constructor
280+
taskScheduler.start()
281281

282282
private[spark] val cleaner: Option[ContextCleaner] = {
283283
if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
@@ -1007,6 +1007,9 @@ class SparkContext(config: SparkConf) extends Logging {
10071007
partitions: Seq[Int],
10081008
allowLocal: Boolean,
10091009
resultHandler: (Int, U) => Unit) {
1010+
if (dagScheduler == null) {
1011+
throw new SparkException("SparkContext has been shutdown")
1012+
}
10101013
partitions.foreach{ p =>
10111014
require(p >= 0 && p < rdd.partitions.size, s"Invalid partition requested: $p")
10121015
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import scala.concurrent.duration._
2626
import scala.reflect.ClassTag
2727

2828
import akka.actor._
29+
import akka.actor.SupervisorStrategy.Stop
2930

3031
import org.apache.spark._
3132
import org.apache.spark.executor.TaskMetrics
3233
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
3334
import org.apache.spark.rdd.RDD
3435
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
35-
import org.apache.spark.util.Utils
36+
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, Utils}
3637

3738
/**
3839
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -54,6 +55,7 @@ import org.apache.spark.util.Utils
5455
*/
5556
private[spark]
5657
class DAGScheduler(
58+
sc: SparkContext,
5759
taskScheduler: TaskScheduler,
5860
listenerBus: LiveListenerBus,
5961
mapOutputTracker: MapOutputTrackerMaster,
@@ -65,6 +67,7 @@ class DAGScheduler(
6567

6668
def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
6769
this(
70+
sc,
6871
taskScheduler,
6972
sc.listenerBus,
7073
sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
@@ -116,18 +119,36 @@ class DAGScheduler(
116119
taskScheduler.setDAGScheduler(this)
117120

118121
/**
119-
* Starts the event processing actor. The actor has two responsibilities:
120-
*
121-
* 1. Waits for events like job submission, task finished, task failure etc., and calls
122-
* [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
123-
* 2. Schedules a periodical task to resubmit failed stages.
124-
*
125-
* NOTE: the actor cannot be started in the constructor, because the periodical task references
126-
* some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
127-
* cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
122+
* Starts the event processing actor within the supervisor. The eventProcessingActor
123+
* waits for events like job submission, task finished, task failure etc., and calls
124+
* [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
128125
*/
129-
def start() {
130-
eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
126+
env.actorSystem.actorOf(Props(new Actor {
127+
128+
override val supervisorStrategy =
129+
OneForOneStrategy() {
130+
case x: Exception => {
131+
logError("eventProcesserActor failed due to the error %s; shutting down SparkContext"
132+
.format(x.getMessage))
133+
doCancelAllJobs()
134+
sc.stop()
135+
Stop
136+
}
137+
}
138+
139+
// do nothing in supervisor
140+
def receive = {
141+
case _ =>
142+
}
143+
144+
eventProcessActor = context.actorOf(Props(new Actor {
145+
146+
override def preStart() {
147+
// set DAGScheduler for taskScheduler to ensure eventProcessActor is always
148+
// valid when the messages arrive
149+
taskScheduler.setDAGScheduler(DAGScheduler.this)
150+
}
151+
131152
/**
132153
* The main event loop of the DAG scheduler.
133154
*/
@@ -137,8 +158,8 @@ class DAGScheduler(
137158

138159
/**
139160
* All events are forwarded to `processEvent()`, so that the event processing logic can
140-
* easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite`
141-
* for details.
161+
* be easily tested without starting a dedicated actor. Please refer to
162+
* `DAGSchedulerSuite` for details.
142163
*/
143164
if (!processEvent(event)) {
144165
submitWaitingStages()
@@ -147,7 +168,7 @@ class DAGScheduler(
147168
}
148169
}
149170
}))
150-
}
171+
}))
151172

152173
// Called by TaskScheduler to report task's starting.
153174
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
@@ -511,6 +532,14 @@ class DAGScheduler(
511532
eventProcessActor ! AllJobsCancelled
512533
}
513534

535+
private def doCancelAllJobs() {
536+
// Cancel all running jobs.
537+
runningStages.map(_.jobId).foreach(handleJobCancellation(_,
538+
reason = "as part of cancellation of all jobs"))
539+
activeJobs.clear() // These should already be empty by this point,
540+
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
541+
}
542+
514543
/**
515544
* Cancel all jobs associated with a running or scheduled stage.
516545
*/
@@ -575,10 +604,7 @@ class DAGScheduler(
575604

576605
case AllJobsCancelled =>
577606
// Cancel all running jobs.
578-
runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId,
579-
"as part of cancellation of all jobs"))
580-
activeJobs.clear() // These should already be empty by this point,
581-
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
607+
doCancelAllJobs()
582608

583609
case ExecutorAdded(execId, host) =>
584610
handleExecutorAdded(execId, host)
@@ -821,7 +847,6 @@ class DAGScheduler(
821847
*/
822848
private def handleTaskCompletion(event: CompletionEvent) {
823849
val task = event.task
824-
825850
if (!stageIdToStage.contains(task.stageId)) {
826851
// Skip all the actions if the stage has been cancelled.
827852
return
@@ -1152,6 +1177,7 @@ class DAGScheduler(
11521177
}
11531178

11541179
def stop() {
1180+
logInfo("Stopping DAGScheduler")
11551181
if (eventProcessActor != null) {
11561182
eventProcessActor ! StopDAGScheduler
11571183
}

core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
121121
results.clear()
122122
mapOutputTracker = new MapOutputTrackerMaster(conf)
123123
scheduler = new DAGScheduler(
124+
sc,
124125
taskScheduler,
125126
sc.listenerBus,
126127
mapOutputTracker,

0 commit comments

Comments
 (0)