Skip to content

Commit 22a19ac

Browse files
committed
Use a semaphore to wait for backend to initalize
Also pick a random port to avoid collisions
1 parent 86fc639 commit 22a19ac

1 file changed

Lines changed: 12 additions & 1 deletion

File tree

pkg/src/src/main/scala/edu/berkeley/cs/amplab/sparkr/SparkRRunner.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package edu.berkeley.cs.amplab.sparkr
22

33
import java.io._
44
import java.net.URI
5+
import java.util.concurrent.Semaphore
56

67
import scala.collection.mutable.ArrayBuffer
78
import scala.collection.JavaConversions._
@@ -17,8 +18,12 @@ object SparkRRunner {
1718
val rFile = args(0)
1819

1920
val otherArgs = args.slice(1, args.length)
21+
22+
// Pick a non-privileged port
23+
val randomPort = scala.util.Random.nextInt(65536 - 1024) + 1024
24+
2025
// TODO: Can we get this from SparkConf ?
21-
val sparkRBackendPort = sys.env.getOrElse("SPARKR_BACKEND_PORT", "12345").toInt
26+
val sparkRBackendPort = sys.env.getOrElse("SPARKR_BACKEND_PORT", randomPort.toString).toInt
2227
val rCommand = "Rscript"
2328

2429
// Check if the file path exists.
@@ -35,8 +40,12 @@ object SparkRRunner {
3540
// Java system properties etc.
3641
val sparkRBackend = new SparkRBackend()
3742
val sparkRBackendThread = new Thread() {
43+
val finishedInit = new Semaphore(1)
44+
finishedInit.acquire()
45+
3846
override def run() {
3947
sparkRBackend.init(sparkRBackendPort)
48+
finishedInit.release()
4049
sparkRBackend.run()
4150
}
4251

@@ -46,6 +55,8 @@ object SparkRRunner {
4655
}
4756

4857
sparkRBackendThread.start()
58+
// Wait for SparkRBackend initialization to finish
59+
sparkRBackendThread.finishedInit.acquire()
4960

5061
// Launch R
5162
val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)

0 commit comments

Comments
 (0)