Skip to content

Commit 910e3be

Browse files
committed
Add a timeout for initialization
Also move sparkRBackend.stop into a finally block
1 parent bf52b17 commit 910e3be

1 file changed

Lines changed: 23 additions & 14 deletions

File tree

pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SparkRRunner.scala

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package edu.berkeley.cs.amplab.sparkr
33
import java.io._
44
import java.net.URI
55
import java.util.concurrent.Semaphore
6+
import java.util.concurrent.TimeUnit
67

78
import scala.collection.mutable.ArrayBuffer
89
import scala.collection.JavaConversions._
@@ -19,6 +20,8 @@ object SparkRRunner {
1920

2021
val otherArgs = args.slice(1, args.length)
2122

23+
// Time to wait for SparkR backend to initialize in seconds
24+
val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
2225
// TODO: Can we get this from SparkConf ?
2326
val sparkRBackendPort = sys.env.getOrElse("SPARKR_BACKEND_PORT", "12345").toInt
2427
val rCommand = "Rscript"
@@ -52,20 +55,26 @@ object SparkRRunner {
5255

5356
sparkRBackendThread.start()
5457
// Wait for SparkRBackend initialization to finish
55-
sparkRBackendThread.finishedInit.acquire()
56-
57-
// Launch R
58-
val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
59-
val env = builder.environment()
60-
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
61-
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
62-
val process = builder.start()
63-
64-
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
65-
66-
val returnCode = process.waitFor()
67-
sparkRBackendThread.stopBackend()
68-
System.exit(returnCode)
58+
if (sparkRBackendThread.finishedInit.tryAcquire(backendTimeout, TimeUnit.SECONDS)) {
59+
// Launch R
60+
val returnCode = try {
61+
val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
62+
val env = builder.environment()
63+
env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString)
64+
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
65+
val process = builder.start()
66+
67+
new RedirectThread(process.getInputStream, System.out, "redirect output").start()
68+
69+
process.waitFor()
70+
} finally {
71+
sparkRBackendThread.stopBackend()
72+
}
73+
System.exit(returnCode)
74+
} else {
75+
System.err.println("SparkR backend did not initialize in " + backendTimeout + " seconds")
76+
System.exit(-1)
77+
}
6978
}
7079

7180
private class RedirectThread(

0 commit comments

Comments
 (0)