Skip to content

Commit 010aeca

Browse files
committed
[SPARK-33748][K8S] Respect environment variables and configurations for Python executables
### What changes were proposed in this pull request? This PR proposes: - Respect `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON` environment variables, or `spark.pyspark.python` and `spark.pyspark.driver.python` configurations in Kubernates just like other cluster types in Spark. - Depreate `spark.kubernetes.pyspark.pythonVersion` and guide users to set the environment variables and configurations for Python executables. NOTE that `spark.kubernetes.pyspark.pythonVersion` is already a no-op configuration without this PR. Default is `3` and other values are disallowed. - In order for Python executable settings to be consistently used, fix `spark.archives` option to unpack into the current working directory in the driver of Kubernates' cluster mode. This behaviour is identical with Yarn's cluster mode. By doing this, users can leverage Conda or virtuenenv in cluster mode as below: ```python conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz PYSPARK_PYTHON=./environment/bin/python spark-submit --archives pyspark_conda_env.tar.gz#environment app.py ``` - Removed several unused or useless codes such as `extractS3Key` and `renameResourcesToLocalFS` ### Why are the changes needed? - To provide a consistent support of PySpark by using `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON` environment variables, or `spark.pyspark.python` and `spark.pyspark.driver.python` configurations. - To provide Conda and virtualenv support via `spark.archives` options. ### Does this PR introduce _any_ user-facing change? Yes: - `spark.kubernetes.pyspark.pythonVersion` is deprecated. - `PYSPARK_PYTHON` and `PYSPARK_DRIVER_PYTHON` environment variables, and `spark.pyspark.python` and `spark.pyspark.driver.python` configurations are respected. ### How was this patch tested? Manually tested via: ```bash minikube delete minikube start --cpus 12 --memory 16384 kubectl create namespace spark-integration-test cat <<EOF | kubectl apply -f - apiVersion: v1 kind: ServiceAccount metadata: name: spark namespace: spark-integration-test EOF kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=spark-integration-test:spark --namespace=spark-integration-test dev/make-distribution.sh --pip --tgz -Pkubernetes resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh --spark-tgz `pwd`/spark-3.2.0-SNAPSHOT-bin-3.2.0.tgz --service-account spark --namespace spark-integration-test ``` Unittests were also added. Closes apache#30735 from HyukjinKwon/SPARK-33748. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]> (cherry picked from commit a99a47c) Signed-off-by: HyukjinKwon <[email protected]>
1 parent ac4d04e commit 010aeca

14 files changed

Lines changed: 256 additions & 78 deletions

File tree

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import scala.collection.JavaConverters._
3131
import scala.collection.mutable.ArrayBuffer
3232
import scala.util.{Properties, Try}
3333

34-
import org.apache.commons.io.FilenameUtils
3534
import org.apache.commons.lang3.StringUtils
3635
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
3736
import org.apache.hadoop.fs.{FileSystem, Path}
@@ -387,20 +386,40 @@ private[spark] class SparkSubmit extends Logging {
387386
// Replace with the downloaded local jar path to avoid propagating hadoop compatible uris.
388387
// Executors will get the jars from the Spark file server.
389388
// Explicitly download the related files here
390-
args.jars = renameResourcesToLocalFS(args.jars, localJars)
389+
args.jars = localJars
391390
val filesLocalFiles = Option(args.files).map {
392391
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
393392
}.orNull
394-
val archiveLocalFiles = Option(args.archives).map { uri =>
395-
val resolvedUri = Utils.resolveURI(uri)
396-
val downloadedUri = downloadFileList(
397-
UriBuilder.fromUri(resolvedUri).fragment(null).build().toString,
393+
val archiveLocalFiles = Option(args.archives).map { uris =>
394+
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
395+
val localArchives = downloadFileList(
396+
resolvedUris.map(
397+
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
398398
targetDir, sparkConf, hadoopConf, secMgr)
399-
UriBuilder.fromUri(downloadedUri).fragment(resolvedUri.getFragment).build().toString
399+
400+
// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
401+
// in cluster mode, the archives should be available in the driver's current working
402+
// directory too.
403+
Utils.stringToSeq(localArchives).map(Utils.resolveURI).zip(resolvedUris).map {
404+
case (localArchive, resolvedUri) =>
405+
val source = new File(localArchive.getPath)
406+
val dest = new File(
407+
".",
408+
if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
409+
logInfo(
410+
s"Unpacking an archive $resolvedUri " +
411+
s"from ${source.getAbsolutePath} to ${dest.getAbsolutePath}")
412+
Utils.deleteRecursively(dest)
413+
Utils.unpack(source, dest)
414+
415+
// Keep the URIs of local files with the given fragments.
416+
UriBuilder.fromUri(
417+
localArchive).fragment(resolvedUri.getFragment).build().toString
418+
}.mkString(",")
400419
}.orNull
401-
args.files = renameResourcesToLocalFS(args.files, filesLocalFiles)
402-
args.archives = renameResourcesToLocalFS(args.archives, archiveLocalFiles)
403-
args.pyFiles = renameResourcesToLocalFS(args.pyFiles, localPyFiles)
420+
args.files = filesLocalFiles
421+
args.archives = archiveLocalFiles
422+
args.pyFiles = localPyFiles
404423
}
405424
}
406425

@@ -836,21 +855,6 @@ private[spark] class SparkSubmit extends Logging {
836855
(childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
837856
}
838857

839-
private def renameResourcesToLocalFS(resources: String, localResources: String): String = {
840-
if (resources != null && localResources != null) {
841-
val localResourcesSeq = Utils.stringToSeq(localResources)
842-
Utils.stringToSeq(resources).map { resource =>
843-
val filenameRemote = FilenameUtils.getName(new URI(resource).getPath)
844-
localResourcesSeq.find { localUri =>
845-
val filenameLocal = FilenameUtils.getName(new URI(localUri).getPath)
846-
filenameRemote == filenameLocal
847-
}.getOrElse(resource)
848-
}.mkString(",")
849-
} else {
850-
resources
851-
}
852-
}
853-
854858
// [SPARK-20328]. HadoopRDD calls into a Hadoop library that fetches delegation tokens with
855859
// renewer set to the YARN ResourceManager. Since YARN isn't configured in Mesos or Kubernetes
856860
// mode, we must trick it into thinking we're YARN.

docs/running-on-kubernetes.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1087,7 +1087,10 @@ See the [configuration page](configuration.html) for information on Spark config
10871087
<td><code>spark.kubernetes.pyspark.pythonVersion</code></td>
10881088
<td><code>"3"</code></td>
10891089
<td>
1090-
This sets the major Python version of the docker image used to run the driver and executor containers. Can be 3.
1090+
This sets the major Python version of the docker image used to run the driver and executor containers.
1091+
It can be only "3". This configuration was deprecated from Spark 3.1.0, and is effectively no-op.
1092+
Users should set 'spark.pyspark.python' and 'spark.pyspark.driver.python' configurations or
1093+
'PYSPARK_PYTHON' and 'PYSPARK_DRIVER_PYTHON' environment variables.
10911094
</td>
10921095
<td>2.4.0</td>
10931096
</tr>

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import java.util.concurrent.TimeUnit
2020

2121
import org.apache.spark.deploy.k8s.Constants._
2222
import org.apache.spark.internal.Logging
23+
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
2324
import org.apache.spark.internal.config.ConfigBuilder
2425

2526
private[spark] object Config extends Logging {
@@ -293,12 +294,19 @@ private[spark] object Config extends Logging {
293294

294295
val PYSPARK_MAJOR_PYTHON_VERSION =
295296
ConfigBuilder("spark.kubernetes.pyspark.pythonVersion")
296-
.doc("This sets the major Python version. Only 3 is available for Python3.")
297+
.doc(
298+
s"(Deprecated since Spark 3.1, please set '${PYSPARK_PYTHON.key}' and " +
299+
s"'${PYSPARK_DRIVER_PYTHON.key}' configurations or $ENV_PYSPARK_PYTHON and " +
300+
s"$ENV_PYSPARK_DRIVER_PYTHON environment variables instead.)")
297301
.version("2.4.0")
298302
.stringConf
299-
.checkValue(pv => List("3").contains(pv),
300-
"Ensure that major Python version is Python3")
301-
.createWithDefault("3")
303+
.checkValue("3" == _,
304+
"Python 2 was dropped from Spark 3.1, and only 3 is allowed in " +
305+
"this configuration. Note that this configuration was deprecated in Spark 3.1. " +
306+
s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " +
307+
s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " +
308+
"variables instead.")
309+
.createOptional
302310

303311
val KUBERNETES_KERBEROS_KRB5_FILE =
304312
ConfigBuilder("spark.kubernetes.kerberos.krb5.path")

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ private[spark] object Constants {
7474
val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION"
7575

7676
// BINDINGS
77-
val ENV_PYSPARK_MAJOR_PYTHON_VERSION = "PYSPARK_MAJOR_PYTHON_VERSION"
77+
val ENV_PYSPARK_PYTHON = "PYSPARK_PYTHON"
78+
val ENV_PYSPARK_DRIVER_PYTHON = "PYSPARK_DRIVER_PYTHON"
7879

7980
// Pod spec templates
8081
val EXECUTOR_POD_SPEC_TEMPLATE_FILE_NAME = "pod-spec-template.yml"

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@ import org.apache.spark.deploy.k8s._
2424
import org.apache.spark.deploy.k8s.Config._
2525
import org.apache.spark.deploy.k8s.Constants._
2626
import org.apache.spark.deploy.k8s.submit._
27+
import org.apache.spark.internal.Logging
28+
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
2729
import org.apache.spark.launcher.SparkLauncher
2830

2931
/**
3032
* Creates the driver command for running the user app, and propagates needed configuration so
3133
* executors can also find the app code.
3234
*/
3335
private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
34-
extends KubernetesFeatureConfigStep {
36+
extends KubernetesFeatureConfigStep with Logging {
3537

3638
override def configurePod(pod: SparkPod): SparkPod = {
3739
conf.mainAppResource match {
@@ -70,12 +72,37 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
7072
SparkPod(pod.pod, driverContainer)
7173
}
7274

75+
// Exposed for testing purpose.
76+
private[spark] def environmentVariables: Map[String, String] = sys.env
77+
7378
private def configureForPython(pod: SparkPod, res: String): SparkPod = {
79+
if (conf.get(PYSPARK_MAJOR_PYTHON_VERSION).isDefined) {
80+
logWarning(
81+
s"${PYSPARK_MAJOR_PYTHON_VERSION.key} was deprecated in Spark 3.1. " +
82+
s"Please set '${PYSPARK_PYTHON.key}' and '${PYSPARK_DRIVER_PYTHON.key}' " +
83+
s"configurations or $ENV_PYSPARK_PYTHON and $ENV_PYSPARK_DRIVER_PYTHON environment " +
84+
"variables instead.")
85+
}
86+
7487
val pythonEnvs =
75-
Seq(new EnvVarBuilder()
76-
.withName(ENV_PYSPARK_MAJOR_PYTHON_VERSION)
77-
.withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
78-
.build())
88+
Seq(
89+
conf.get(PYSPARK_PYTHON)
90+
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
91+
new EnvVarBuilder()
92+
.withName(ENV_PYSPARK_PYTHON)
93+
.withValue(value)
94+
.build()
95+
},
96+
conf.get(PYSPARK_DRIVER_PYTHON)
97+
.orElse(conf.get(PYSPARK_PYTHON))
98+
.orElse(environmentVariables.get(ENV_PYSPARK_DRIVER_PYTHON))
99+
.orElse(environmentVariables.get(ENV_PYSPARK_PYTHON)).map { value =>
100+
new EnvVarBuilder()
101+
.withName(ENV_PYSPARK_DRIVER_PYTHON)
102+
.withValue(value)
103+
.build()
104+
}
105+
).flatten
79106

80107
// re-write primary resource to be the remote one and upload the related file
81108
val newResName = KubernetesUtils

resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStepSuite.scala

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.{SparkConf, SparkFunSuite}
2222
import org.apache.spark.deploy.k8s._
2323
import org.apache.spark.deploy.k8s.Constants._
2424
import org.apache.spark.deploy.k8s.submit._
25+
import org.apache.spark.internal.config.{PYSPARK_DRIVER_PYTHON, PYSPARK_PYTHON}
2526

2627
class DriverCommandFeatureStepSuite extends SparkFunSuite {
2728

@@ -50,12 +51,51 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
5051
"--properties-file", SPARK_CONF_PATH,
5152
"--class", KubernetesTestConf.MAIN_CLASS,
5253
mainResource, "5", "7", "9"))
54+
}
55+
56+
test("python executable precedence") {
57+
val mainResource = "local:/main.py"
5358

54-
val envs = spec.pod.container.getEnv.asScala
55-
.map { env => (env.getName, env.getValue) }
56-
.toMap
57-
val expected = Map(ENV_PYSPARK_MAJOR_PYTHON_VERSION -> "3")
58-
assert(envs === expected)
59+
val pythonExecutables = Seq(
60+
(Some("conf_py"), Some("conf_driver_py"), Some("env_py"), Some("env_driver_py")),
61+
(Some("conf_py"), None, Some("env_py"), Some("env_driver_py")),
62+
(None, None, Some("env_py"), Some("env_driver_py")),
63+
(None, None, Some("env_py"), None)
64+
)
65+
66+
val expectedResults = Seq(
67+
("conf_py", "conf_driver_py"),
68+
("conf_py", "conf_py"),
69+
("env_py", "env_driver_py"),
70+
("env_py", "env_py")
71+
)
72+
73+
pythonExecutables.zip(expectedResults).foreach { case (pythonExecutable, expected) =>
74+
val sparkConf = new SparkConf(false)
75+
val (confPy, confDriverPy, envPy, envDriverPy) = pythonExecutable
76+
confPy.foreach(sparkConf.set(PYSPARK_PYTHON, _))
77+
confDriverPy.foreach(sparkConf.set(PYSPARK_DRIVER_PYTHON, _))
78+
val pythonEnvs = Map(
79+
(
80+
envPy.map(v => ENV_PYSPARK_PYTHON -> v :: Nil) ++
81+
envDriverPy.map(v => ENV_PYSPARK_DRIVER_PYTHON -> v :: Nil)
82+
).flatten.toArray: _*)
83+
84+
val spec = applyFeatureStep(
85+
PythonMainAppResource(mainResource),
86+
conf = sparkConf,
87+
appArgs = Array("foo"),
88+
env = pythonEnvs)
89+
90+
val envs = spec.pod.container.getEnv.asScala
91+
.map { env => (env.getName, env.getValue) }
92+
.toMap
93+
94+
val (expectedEnvPy, expectedDriverPy) = expected
95+
assert(envs === Map(
96+
ENV_PYSPARK_PYTHON -> expectedEnvPy,
97+
ENV_PYSPARK_DRIVER_PYTHON -> expectedDriverPy))
98+
}
5999
}
60100

61101
test("R resource") {
@@ -123,13 +163,16 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
123163
resource: MainAppResource,
124164
conf: SparkConf = new SparkConf(false),
125165
appArgs: Array[String] = Array(),
126-
proxyUser: Option[String] = None): KubernetesDriverSpec = {
166+
proxyUser: Option[String] = None,
167+
env: Map[String, String] = Map.empty[String, String]): KubernetesDriverSpec = {
127168
val kubernetesConf = KubernetesTestConf.createDriverConf(
128169
sparkConf = conf,
129170
mainAppResource = resource,
130171
appArgs = appArgs,
131172
proxyUser = proxyUser)
132-
val step = new DriverCommandFeatureStep(kubernetesConf)
173+
val step = new DriverCommandFeatureStep(kubernetesConf) {
174+
private[spark] override val environmentVariables: Map[String, String] = env
175+
}
133176
val pod = step.configurePod(SparkPod.initialPod())
134177
val props = step.getAdditionalPodSystemProperties()
135178
KubernetesDriverSpec(pod, Nil, props)

resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ if [ -n "$SPARK_EXTRA_CLASSPATH" ]; then
4444
SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_EXTRA_CLASSPATH"
4545
fi
4646

47-
if [ "$PYSPARK_MAJOR_PYTHON_VERSION" == "3" ]; then
48-
pyv3="$(python3 -V 2>&1)"
49-
export PYTHON_VERSION="${pyv3:7}"
50-
export PYSPARK_PYTHON="python3"
51-
export PYSPARK_DRIVER_PYTHON="python3"
47+
if ! [ -z ${PYSPARK_PYTHON+x} ]; then
48+
export PYSPARK_PYTHON
49+
fi
50+
if ! [ -z ${PYSPARK_DRIVER_PYTHON+x} ]; then
51+
export PYSPARK_DRIVER_PYTHON
5252
fi
5353

5454
# If HADOOP_HOME is set and SPARK_DIST_CLASSPATH is not set, set it here so Hadoop jars are available to the executor.

0 commit comments

Comments
 (0)