Skip to content

Commit f1a99ad

Browse files
ifilonenkomccheah
authored andcommitted
[SPARK-23984][K8S][TEST] Added Integration Tests for PySpark on Kubernetes
## What changes were proposed in this pull request? I added integration tests for PySpark ( + checking JVM options + RemoteFileTest) which wasn't properly merged in the initial integration test PR. ## How was this patch tested? I tested this with integration tests using: `dev/dev-run-integration-tests.sh --spark-tgz spark-2.4.0-SNAPSHOT-bin-2.7.3.tgz` Author: Ilan Filonenko <[email protected]> Closes apache#21583 from ifilonenko/master.
1 parent a75571b commit f1a99ad

2 files changed

Lines changed: 167 additions & 32 deletions

File tree

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 147 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.util.UUID
2222
import java.util.regex.Pattern
2323

2424
import com.google.common.io.PatternFilenameFilter
25-
import io.fabric8.kubernetes.api.model.{Container, Pod}
25+
import io.fabric8.kubernetes.api.model.Pod
2626
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
2727
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
2828
import org.scalatest.time.{Minutes, Seconds, Span}
@@ -43,6 +43,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
4343
private var kubernetesTestComponents: KubernetesTestComponents = _
4444
private var sparkAppConf: SparkAppConf = _
4545
private var image: String = _
46+
private var pyImage: String = _
4647
private var containerLocalSparkDistroExamplesJar: String = _
4748
private var appLocator: String = _
4849
private var driverPodName: String = _
@@ -65,6 +66,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite
6566
val imageTag = getTestImageTag
6667
val imageRepo = getTestImageRepo
6768
image = s"$imageRepo/spark:$imageTag"
69+
pyImage = s"$imageRepo/spark-py:$imageTag"
6870

6971
val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars"))
7072
.toFile
@@ -156,46 +158,140 @@ private[spark] class KubernetesSuite extends SparkFunSuite
156158
})
157159
}
158160

159-
// TODO(ssuchter): Enable the below after debugging
160-
// test("Run PageRank using remote data file") {
161-
// sparkAppConf
162-
// .set("spark.kubernetes.mountDependencies.filesDownloadDir",
163-
// CONTAINER_LOCAL_FILE_DOWNLOAD_PATH)
164-
// .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
165-
// runSparkPageRankAndVerifyCompletion(
166-
// appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE))
167-
// }
161+
test("Run extraJVMOptions check on driver") {
162+
sparkAppConf
163+
.set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar")
164+
runSparkJVMCheckAndVerifyCompletion(
165+
expectedJVMValue = Seq("(spark.test.foo,spark.test.bar)"))
166+
}
167+
168+
test("Run SparkRemoteFileTest using a remote data file") {
169+
sparkAppConf
170+
.set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
171+
runSparkRemoteCheckAndVerifyCompletion(
172+
appArgs = Array(REMOTE_PAGE_RANK_FILE_NAME))
173+
}
174+
175+
test("Run PySpark on simple pi.py example") {
176+
sparkAppConf
177+
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
178+
runSparkApplicationAndVerifyCompletion(
179+
appResource = PYSPARK_PI,
180+
mainClass = "",
181+
expectedLogOnCompletion = Seq("Pi is roughly 3"),
182+
appArgs = Array("5"),
183+
driverPodChecker = doBasicDriverPyPodCheck,
184+
executorPodChecker = doBasicExecutorPyPodCheck,
185+
appLocator = appLocator,
186+
isJVM = false)
187+
}
188+
189+
test("Run PySpark with Python2 to test a pyfiles example") {
190+
sparkAppConf
191+
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
192+
.set("spark.kubernetes.pyspark.pythonversion", "2")
193+
runSparkApplicationAndVerifyCompletion(
194+
appResource = PYSPARK_FILES,
195+
mainClass = "",
196+
expectedLogOnCompletion = Seq(
197+
"Python runtime version check is: True",
198+
"Python environment version check is: True"),
199+
appArgs = Array("python"),
200+
driverPodChecker = doBasicDriverPyPodCheck,
201+
executorPodChecker = doBasicExecutorPyPodCheck,
202+
appLocator = appLocator,
203+
isJVM = false,
204+
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
205+
}
206+
207+
test("Run PySpark with Python3 to test a pyfiles example") {
208+
sparkAppConf
209+
.set("spark.kubernetes.container.image", s"${getTestImageRepo}/spark-py:${getTestImageTag}")
210+
.set("spark.kubernetes.pyspark.pythonversion", "3")
211+
runSparkApplicationAndVerifyCompletion(
212+
appResource = PYSPARK_FILES,
213+
mainClass = "",
214+
expectedLogOnCompletion = Seq(
215+
"Python runtime version check is: True",
216+
"Python environment version check is: True"),
217+
appArgs = Array("python3"),
218+
driverPodChecker = doBasicDriverPyPodCheck,
219+
executorPodChecker = doBasicExecutorPyPodCheck,
220+
appLocator = appLocator,
221+
isJVM = false,
222+
pyFiles = Some(PYSPARK_CONTAINER_TESTS))
223+
}
168224

169225
private def runSparkPiAndVerifyCompletion(
170226
appResource: String = containerLocalSparkDistroExamplesJar,
171227
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
172228
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
173229
appArgs: Array[String] = Array.empty[String],
174-
appLocator: String = appLocator): Unit = {
230+
appLocator: String = appLocator,
231+
isJVM: Boolean = true ): Unit = {
175232
runSparkApplicationAndVerifyCompletion(
176233
appResource,
177234
SPARK_PI_MAIN_CLASS,
178235
Seq("Pi is roughly 3"),
179236
appArgs,
180237
driverPodChecker,
181238
executorPodChecker,
182-
appLocator)
239+
appLocator,
240+
isJVM)
183241
}
184242

185-
private def runSparkPageRankAndVerifyCompletion(
243+
private def runSparkRemoteCheckAndVerifyCompletion(
186244
appResource: String = containerLocalSparkDistroExamplesJar,
187245
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
188246
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
189247
appArgs: Array[String],
190248
appLocator: String = appLocator): Unit = {
191249
runSparkApplicationAndVerifyCompletion(
192250
appResource,
193-
SPARK_PAGE_RANK_MAIN_CLASS,
194-
Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"),
251+
SPARK_REMOTE_MAIN_CLASS,
252+
Seq(s"Mounting of ${appArgs.head} was true"),
195253
appArgs,
196254
driverPodChecker,
197255
executorPodChecker,
198-
appLocator)
256+
appLocator,
257+
true)
258+
}
259+
260+
private def runSparkJVMCheckAndVerifyCompletion(
261+
appResource: String = containerLocalSparkDistroExamplesJar,
262+
mainClass: String = SPARK_DRIVER_MAIN_CLASS,
263+
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
264+
appArgs: Array[String] = Array("5"),
265+
expectedJVMValue: Seq[String]): Unit = {
266+
val appArguments = SparkAppArguments(
267+
mainAppResource = appResource,
268+
mainClass = mainClass,
269+
appArgs = appArgs)
270+
SparkAppLauncher.launch(
271+
appArguments,
272+
sparkAppConf,
273+
TIMEOUT.value.toSeconds.toInt,
274+
sparkHomeDir,
275+
true)
276+
277+
val driverPod = kubernetesTestComponents.kubernetesClient
278+
.pods()
279+
.withLabel("spark-app-locator", appLocator)
280+
.withLabel("spark-role", "driver")
281+
.list()
282+
.getItems
283+
.get(0)
284+
doBasicDriverPodCheck(driverPod)
285+
286+
Eventually.eventually(TIMEOUT, INTERVAL) {
287+
expectedJVMValue.foreach { e =>
288+
assert(kubernetesTestComponents.kubernetesClient
289+
.pods()
290+
.withName(driverPod.getMetadata.getName)
291+
.getLog
292+
.contains(e), "The application did not complete.")
293+
}
294+
}
199295
}
200296

201297
private def runSparkApplicationAndVerifyCompletion(
@@ -205,12 +301,20 @@ private[spark] class KubernetesSuite extends SparkFunSuite
205301
appArgs: Array[String],
206302
driverPodChecker: Pod => Unit,
207303
executorPodChecker: Pod => Unit,
208-
appLocator: String): Unit = {
304+
appLocator: String,
305+
isJVM: Boolean,
306+
pyFiles: Option[String] = None): Unit = {
209307
val appArguments = SparkAppArguments(
210308
mainAppResource = appResource,
211309
mainClass = mainClass,
212310
appArgs = appArgs)
213-
SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
311+
SparkAppLauncher.launch(
312+
appArguments,
313+
sparkAppConf,
314+
TIMEOUT.value.toSeconds.toInt,
315+
sparkHomeDir,
316+
isJVM,
317+
pyFiles)
214318

215319
val driverPod = kubernetesTestComponents.kubernetesClient
216320
.pods()
@@ -248,11 +352,22 @@ private[spark] class KubernetesSuite extends SparkFunSuite
248352
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
249353
}
250354

355+
private def doBasicDriverPyPodCheck(driverPod: Pod): Unit = {
356+
assert(driverPod.getMetadata.getName === driverPodName)
357+
assert(driverPod.getSpec.getContainers.get(0).getImage === pyImage)
358+
assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver")
359+
}
360+
251361
private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
252362
assert(executorPod.getSpec.getContainers.get(0).getImage === image)
253363
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
254364
}
255365

366+
private def doBasicExecutorPyPodCheck(executorPod: Pod): Unit = {
367+
assert(executorPod.getSpec.getContainers.get(0).getImage === pyImage)
368+
assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
369+
}
370+
256371
private def checkCustomSettings(pod: Pod): Unit = {
257372
assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
258373
assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
@@ -287,14 +402,22 @@ private[spark] object KubernetesSuite {
287402
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
288403
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
289404
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
405+
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
406+
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
290407
val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank"
408+
val CONTAINER_LOCAL_PYSPARK: String = "local:///opt/spark/examples/src/main/python/"
409+
val PYSPARK_PI: String = CONTAINER_LOCAL_PYSPARK + "pi.py"
410+
val PYSPARK_FILES: String = CONTAINER_LOCAL_PYSPARK + "pyfiles.py"
411+
val PYSPARK_CONTAINER_TESTS: String = CONTAINER_LOCAL_PYSPARK + "py_container_checks.py"
291412

292-
// val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
413+
val TEST_SECRET_NAME_PREFIX = "test-secret-"
414+
val TEST_SECRET_KEY = "test-key"
415+
val TEST_SECRET_VALUE = "test-data"
416+
val TEST_SECRET_MOUNT_PATH = "/etc/secrets"
293417

294-
// val REMOTE_PAGE_RANK_DATA_FILE =
295-
// "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
296-
// val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
297-
// s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
418+
val REMOTE_PAGE_RANK_DATA_FILE =
419+
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt"
420+
val REMOTE_PAGE_RANK_FILE_NAME = "pagerank_data.txt"
298421

299-
// case object ShuffleNotReadyException extends Exception
422+
case object ShuffleNotReadyException extends Exception
300423
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,21 +97,33 @@ private[spark] case class SparkAppArguments(
9797
appArgs: Array[String])
9898

9999
private[spark] object SparkAppLauncher extends Logging {
100-
101100
def launch(
102101
appArguments: SparkAppArguments,
103102
appConf: SparkAppConf,
104103
timeoutSecs: Int,
105-
sparkHomeDir: Path): Unit = {
104+
sparkHomeDir: Path,
105+
isJVM: Boolean,
106+
pyFiles: Option[String] = None): Unit = {
106107
val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit"))
107108
logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf")
108-
val commandLine = (Array(sparkSubmitExecutable.toFile.getAbsolutePath,
109+
val preCommandLine = if (isJVM) {
110+
mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
109111
"--deploy-mode", "cluster",
110112
"--class", appArguments.mainClass,
111-
"--master", appConf.get("spark.master")
112-
) ++ appConf.toStringArray :+
113-
appArguments.mainAppResource) ++
114-
appArguments.appArgs
115-
ProcessUtils.executeProcess(commandLine, timeoutSecs)
113+
"--master", appConf.get("spark.master"))
114+
} else {
115+
mutable.ArrayBuffer(sparkSubmitExecutable.toFile.getAbsolutePath,
116+
"--deploy-mode", "cluster",
117+
"--master", appConf.get("spark.master"))
118+
}
119+
val commandLine =
120+
pyFiles.map(s => preCommandLine ++ Array("--py-files", s)).getOrElse(preCommandLine) ++
121+
appConf.toStringArray :+ appArguments.mainAppResource
122+
123+
if (appArguments.appArgs.nonEmpty) {
124+
commandLine += appArguments.appArgs.mkString(" ")
125+
}
126+
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
127+
ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
116128
}
117129
}

0 commit comments

Comments
 (0)