@@ -65,6 +65,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
6565 private val maxNumWorkerFailures = sparkConf.getInt(" spark.yarn.max.worker.failures" ,
6666 math.max(args.numWorkers * 2 , 3 ))
6767
68+ private var registered = false
69+
6870 def run () {
6971 // Setup the directories so things go to yarn approved directories rather
7072 // then user specified and /tmp.
@@ -110,7 +112,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
110112 waitForSparkContextInitialized()
111113
112114 // Do this after spark master is up and SparkContext is created so that we can register UI Url
113- val appMasterResponse : RegisterApplicationMasterResponse = registerApplicationMaster()
115+ synchronized {
116+ if (! isFinished) {
117+ registerApplicationMaster()
118+ registered = true
119+ }
120+ }
114121
115122 // Allocate all containers
116123 allocateWorkers()
@@ -208,7 +215,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
208215 var count = 0
209216 val waitTime = 10000L
210217 val numTries = sparkConf.getInt(" spark.yarn.ApplicationMaster.waitTries" , 10 )
211- while (ApplicationMaster .sparkContextRef.get() == null && count < numTries) {
218+ while (ApplicationMaster .sparkContextRef.get() == null && count < numTries
219+ && ! isFinished) {
212220 logInfo(" Waiting for spark context initialization ... " + count)
213221 count = count + 1
214222 ApplicationMaster .sparkContextRef.wait(waitTime)
@@ -341,17 +349,19 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
341349 return
342350 }
343351 isFinished = true
352+
353+ logInfo(" finishApplicationMaster with " + status)
354+ if (registered) {
355+ val finishReq = Records .newRecord(classOf [FinishApplicationMasterRequest ])
356+ .asInstanceOf [FinishApplicationMasterRequest ]
357+ finishReq.setAppAttemptId(appAttemptId)
358+ finishReq.setFinishApplicationStatus(status)
359+ finishReq.setDiagnostics(diagnostics)
360+ // Set tracking url to empty since we don't have a history server.
361+ finishReq.setTrackingUrl(" " )
362+ resourceManager.finishApplicationMaster(finishReq)
363+ }
344364 }
345-
346- logInfo(" finishApplicationMaster with " + status)
347- val finishReq = Records .newRecord(classOf [FinishApplicationMasterRequest ])
348- .asInstanceOf [FinishApplicationMasterRequest ]
349- finishReq.setAppAttemptId(appAttemptId)
350- finishReq.setFinishApplicationStatus(status)
351- finishReq.setDiagnostics(diagnostics)
352- // Set tracking url to empty since we don't have a history server.
353- finishReq.setTrackingUrl(" " )
354- resourceManager.finishApplicationMaster(finishReq)
355365 }
356366
357367 /**
0 commit comments