Skip to content

Commit b56aaa9

Browse files
Sun Ruishivaram
authored andcommitted
[SPARK-10500][SPARKR] sparkr.zip cannot be created if /R/lib is unwritable
The basic idea is that: The archive of the SparkR package itself, that is sparkr.zip, is created during build process and is contained in the Spark binary distribution. No change to it after the distribution is installed as the directory it resides ($SPARK_HOME/R/lib) may not be writable. When there is R source code contained in jars or Spark packages specified with "--jars" or "--packages" command line option, a temporary directory is created by calling Utils.createTempDir() where the R packages built from the R source code will be installed. The temporary directory is writable, and won't interfere with each other when there are multiple SparkR sessions, and will be deleted when this SparkR session ends. The R binary packages installed in the temporary directory then are packed into an archive named rpkg.zip. sparkr.zip and rpkg.zip are distributed to the cluster in YARN modes. The distribution of rpkg.zip in Standalone modes is not supported in this PR, and will be address in another PR. Various R files are updated to accept multiple lib paths (one is for SparkR package, the other is for other R packages) so that these package can be accessed in R. Author: Sun Rui <rui.sun@intel.com> Closes #9390 from sun-rui/SPARK-10500. (cherry picked from commit 835a79d) Signed-off-by: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
1 parent d981bfc commit b56aaa9

14 files changed

Lines changed: 121 additions & 36 deletions

File tree

R/install-dev.bat

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,9 @@ set SPARK_HOME=%~dp0..
2525
MKDIR %SPARK_HOME%\R\lib
2626

2727
R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\
28+
29+
rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
30+
pushd %SPARK_HOME%\R\lib
31+
%JAVA_HOME%\bin\jar.exe cfM "%SPARK_HOME%\R\lib\sparkr.zip" SparkR
32+
popd
33+

R/install-dev.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,8 @@ Rscript -e ' if("devtools" %in% rownames(installed.packages())) { library(devtoo
4242
# Install SparkR to $LIB_DIR
4343
R CMD INSTALL --library=$LIB_DIR $FWDIR/pkg/
4444

45+
# Zip the SparkR package so that it can be distributed to worker nodes on YARN
46+
cd $LIB_DIR
47+
jar cfM "$LIB_DIR/sparkr.zip" SparkR
48+
4549
popd > /dev/null

R/pkg/R/sparkR.R

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ sparkR.stop <- function() {
4848
}
4949
}
5050

51+
# Remove the R package lib path from .libPaths()
52+
if (exists(".libPath", envir = env)) {
53+
libPath <- get(".libPath", envir = env)
54+
.libPaths(.libPaths()[.libPaths() != libPath])
55+
}
56+
5157
if (exists(".backendLaunched", envir = env)) {
5258
callJStatic("SparkRHandler", "stopBackend")
5359
}
@@ -155,14 +161,20 @@ sparkR.init <- function(
155161
f <- file(path, open="rb")
156162
backendPort <- readInt(f)
157163
monitorPort <- readInt(f)
164+
rLibPath <- readString(f)
158165
close(f)
159166
file.remove(path)
160167
if (length(backendPort) == 0 || backendPort == 0 ||
161-
length(monitorPort) == 0 || monitorPort == 0) {
168+
length(monitorPort) == 0 || monitorPort == 0 ||
169+
length(rLibPath) != 1) {
162170
stop("JVM failed to launch")
163171
}
164172
assign(".monitorConn", socketConnection(port = monitorPort), envir = .sparkREnv)
165173
assign(".backendLaunched", 1, envir = .sparkREnv)
174+
if (rLibPath != "") {
175+
assign(".libPath", rLibPath, envir = .sparkREnv)
176+
.libPaths(c(rLibPath, .libPaths()))
177+
}
166178
}
167179

168180
.sparkREnv$backendPort <- backendPort

R/pkg/inst/profile/general.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
.First <- function() {
1919
packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
20-
.libPaths(c(packageDir, .libPaths()))
20+
dirs <- strsplit(packageDir, ",")[[1]]
21+
.libPaths(c(dirs, .libPaths()))
2122
Sys.setenv(NOAWT=1)
2223
}

R/pkg/inst/worker/daemon.R

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@
1818
# Worker daemon
1919

2020
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
21-
script <- paste(rLibDir, "SparkR/worker/worker.R", sep = "/")
21+
dirs <- strsplit(rLibDir, ",")[[1]]
22+
script <- file.path(dirs[[1]], "SparkR", "worker", "worker.R")
2223

2324
# preload SparkR package, speedup worker
24-
.libPaths(c(rLibDir, .libPaths()))
25+
.libPaths(c(dirs, .libPaths()))
2526
suppressPackageStartupMessages(library(SparkR))
2627

2728
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))

R/pkg/inst/worker/worker.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,11 @@ bootTime <- currentTimeSecs()
3535
bootElap <- elapsedSecs()
3636

3737
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
38+
dirs <- strsplit(rLibDir, ",")[[1]]
3839
# Set libPaths to include SparkR package as loadNamespace needs this
3940
# TODO: Figure out if we can avoid this by not loading any objects that require
4041
# SparkR namespace
41-
.libPaths(c(rLibDir, .libPaths()))
42+
.libPaths(c(dirs, .libPaths()))
4243
suppressPackageStartupMessages(library(SparkR))
4344

4445
port <- as.integer(Sys.getenv("SPARKR_WORKER_PORT"))

core/src/main/scala/org/apache/spark/api/r/RBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ private[spark] object RBackend extends Logging {
113113
val dos = new DataOutputStream(new FileOutputStream(f))
114114
dos.writeInt(boundPort)
115115
dos.writeInt(listenPort)
116+
SerDe.writeString(dos, RUtils.rPackages.getOrElse(""))
116117
dos.close()
117118
f.renameTo(new File(path))
118119

core/src/main/scala/org/apache/spark/api/r/RRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,14 +400,14 @@ private[r] object RRDD {
400400

401401
val rOptions = "--vanilla"
402402
val rLibDir = RUtils.sparkRPackagePath(isDriver = false)
403-
val rExecScript = rLibDir + "/SparkR/worker/" + script
403+
val rExecScript = rLibDir(0) + "/SparkR/worker/" + script
404404
val pb = new ProcessBuilder(Arrays.asList(rCommand, rOptions, rExecScript))
405405
// Unset the R_TESTS environment variable for workers.
406406
// This is set by R CMD check as startup.Rs
407407
// (http://svn.r-project.org/R/trunk/src/library/tools/R/testing.R)
408408
// and confuses worker script which tries to load a non-existent file
409409
pb.environment().put("R_TESTS", "")
410-
pb.environment().put("SPARKR_RLIBDIR", rLibDir)
410+
pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(","))
411411
pb.environment().put("SPARKR_WORKER_PORT", port.toString)
412412
pb.redirectErrorStream(true) // redirect stderr into stdout
413413
val proc = pb.start()

core/src/main/scala/org/apache/spark/api/r/RUtils.scala

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ import java.util.Arrays
2323
import org.apache.spark.{SparkEnv, SparkException}
2424

2525
private[spark] object RUtils {
26+
// Local path where R binary packages built from R source code contained in the spark
27+
// packages specified with "--packages" or "--jars" command line option reside.
28+
var rPackages: Option[String] = None
29+
2630
/**
2731
* Get the SparkR package path in the local spark distribution.
2832
*/
@@ -34,11 +38,15 @@ private[spark] object RUtils {
3438
}
3539

3640
/**
37-
* Get the SparkR package path in various deployment modes.
41+
* Get the list of paths for R packages in various deployment modes, of which the first
42+
* path is for the SparkR package itself. The second path is for R packages built as
43+
* part of Spark Packages, if any exist. Spark Packages can be provided through the
44+
* "--packages" or "--jars" command line options.
45+
*
3846
* This assumes that Spark properties `spark.master` and `spark.submit.deployMode`
3947
* and environment variable `SPARK_HOME` are set.
4048
*/
41-
def sparkRPackagePath(isDriver: Boolean): String = {
49+
def sparkRPackagePath(isDriver: Boolean): Seq[String] = {
4250
val (master, deployMode) =
4351
if (isDriver) {
4452
(sys.props("spark.master"), sys.props("spark.submit.deployMode"))
@@ -51,15 +59,30 @@ private[spark] object RUtils {
5159
val isYarnClient = master != null && master.contains("yarn") && deployMode == "client"
5260

5361
// In YARN mode, the SparkR package is distributed as an archive symbolically
54-
// linked to the "sparkr" file in the current directory. Note that this does not apply
55-
// to the driver in client mode because it is run outside of the cluster.
62+
// linked to the "sparkr" file in the current directory and additional R packages
63+
// are distributed as an archive symbolically linked to the "rpkg" file in the
64+
// current directory.
65+
//
66+
// Note that this does not apply to the driver in client mode because it is run
67+
// outside of the cluster.
5668
if (isYarnCluster || (isYarnClient && !isDriver)) {
57-
new File("sparkr").getAbsolutePath
69+
val sparkRPkgPath = new File("sparkr").getAbsolutePath
70+
val rPkgPath = new File("rpkg")
71+
if (rPkgPath.exists()) {
72+
Seq(sparkRPkgPath, rPkgPath.getAbsolutePath)
73+
} else {
74+
Seq(sparkRPkgPath)
75+
}
5876
} else {
5977
// Otherwise, assume the package is local
6078
// TODO: support this for Mesos
61-
localSparkRPackagePath.getOrElse {
62-
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
79+
val sparkRPkgPath = localSparkRPackagePath.getOrElse {
80+
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package.")
81+
}
82+
if (!rPackages.isEmpty) {
83+
Seq(sparkRPkgPath, rPackages.get)
84+
} else {
85+
Seq(sparkRPkgPath)
6386
}
6487
}
6588
}

core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,20 +100,29 @@ private[deploy] object RPackageUtils extends Logging {
100100
* Runs the standard R package installation code to build the R package from source.
101101
* Multiple runs don't cause problems.
102102
*/
103-
private def rPackageBuilder(dir: File, printStream: PrintStream, verbose: Boolean): Boolean = {
103+
private def rPackageBuilder(
104+
dir: File,
105+
printStream: PrintStream,
106+
verbose: Boolean,
107+
libDir: String): Boolean = {
104108
// this code should be always running on the driver.
105-
val pathToSparkR = RUtils.localSparkRPackagePath.getOrElse(
106-
throw new SparkException("SPARK_HOME not set. Can't locate SparkR package."))
107109
val pathToPkg = Seq(dir, "R", "pkg").mkString(File.separator)
108-
val installCmd = baseInstallCmd ++ Seq(pathToSparkR, pathToPkg)
110+
val installCmd = baseInstallCmd ++ Seq(libDir, pathToPkg)
109111
if (verbose) {
110112
print(s"Building R package with the command: $installCmd", printStream)
111113
}
112114
try {
113115
val builder = new ProcessBuilder(installCmd.asJava)
114116
builder.redirectErrorStream(true)
117+
118+
// Put the SparkR package directory into R library search paths in case this R package
119+
// may depend on SparkR.
115120
val env = builder.environment()
116-
env.clear()
121+
val rPackageDir = RUtils.sparkRPackagePath(isDriver = true)
122+
env.put("SPARKR_PACKAGE_DIR", rPackageDir.mkString(","))
123+
env.put("R_PROFILE_USER",
124+
Seq(rPackageDir(0), "SparkR", "profile", "general.R").mkString(File.separator))
125+
117126
val process = builder.start()
118127
new RedirectThread(process.getInputStream, printStream, "redirect R packaging").start()
119128
process.waitFor() == 0
@@ -170,8 +179,11 @@ private[deploy] object RPackageUtils extends Logging {
170179
if (checkManifestForR(jar)) {
171180
print(s"$file contains R source code. Now installing package.", printStream, Level.INFO)
172181
val rSource = extractRFolder(jar, printStream, verbose)
182+
if (RUtils.rPackages.isEmpty) {
183+
RUtils.rPackages = Some(Utils.createTempDir().getAbsolutePath)
184+
}
173185
try {
174-
if (!rPackageBuilder(rSource, printStream, verbose)) {
186+
if (!rPackageBuilder(rSource, printStream, verbose, RUtils.rPackages.get)) {
175187
print(s"ERROR: Failed to build R package in $file.", printStream)
176188
print(RJarDoc, printStream)
177189
}
@@ -208,7 +220,7 @@ private[deploy] object RPackageUtils extends Logging {
208220
}
209221
}
210222

211-
/** Zips all the libraries found with SparkR in the R/lib directory for distribution with Yarn. */
223+
/** Zips all the R libraries built for distribution to the cluster. */
212224
private[deploy] def zipRLibraries(dir: File, name: String): File = {
213225
val filesToBundle = listFilesRecursively(dir, Seq(".zip"))
214226
// create a zip file from scratch, do not append to existing file.

0 commit comments

Comments
 (0)