11package spark .deploy .master
22
3- import scala .collection .mutable .{ArrayBuffer , HashMap , HashSet }
4-
53import akka .actor ._
6- import spark .{Logging , Utils }
7- import spark .util .AkkaUtils
4+ import akka .actor .Terminated
5+ import akka .remote .{RemoteClientLifeCycleEvent , RemoteClientDisconnected , RemoteClientShutdown }
6+
87import java .text .SimpleDateFormat
98import java .util .Date
10- import akka .remote .RemoteClientLifeCycleEvent
9+
10+ import scala .collection .mutable .{ArrayBuffer , HashMap , HashSet }
11+
1112import spark .deploy ._
12- import akka .remote .RemoteClientShutdown
13- import akka .remote .RemoteClientDisconnected
14- import spark .deploy .RegisterWorker
15- import spark .deploy .RegisterWorkerFailed
16- import akka .actor .Terminated
13+ import spark .{Logging , SparkException , Utils }
14+ import spark .util .AkkaUtils
15+
1716
1817class Master (ip : String , port : Int , webUiPort : Int ) extends Actor with Logging {
1918 val DATE_FORMAT = new SimpleDateFormat (" yyyyMMddHHmmss" ) // For job IDs
@@ -81,12 +80,22 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
8180 exec.state = state
8281 exec.job.actor ! ExecutorUpdated (execId, state, message)
8382 if (ExecutorState .isFinished(state)) {
83+ val jobInfo = idToJob(jobId)
8484 // Remove this executor from the worker and job
8585 logInfo(" Removing executor " + exec.fullId + " because it is " + state)
86- idToJob(jobId) .removeExecutor(exec)
86+ jobInfo .removeExecutor(exec)
8787 exec.worker.removeExecutor(exec)
88- // TODO: the worker would probably want to restart the executor a few times
89- schedule()
88+
89+ // Only retry certain number of times so we don't go into an infinite loop.
90+ if (jobInfo.incrementRetryCount <= JobState .MAX_NUM_RETRY ) {
91+ schedule()
92+ } else {
93+ val e = new SparkException (" Job %s wth ID %s failed %d times." .format(
94+ jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
95+ logError(e.getMessage, e)
96+ throw e
97+ // System.exit(1)
98+ }
9099 }
91100 }
92101 case None =>
@@ -112,7 +121,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
112121 addressToWorker.get(address).foreach(removeWorker)
113122 addressToJob.get(address).foreach(removeJob)
114123 }
115-
124+
116125 case RequestMasterState => {
117126 sender ! MasterState (ip + " :" + port, workers.toList, jobs.toList, completedJobs.toList)
118127 }
0 commit comments