From c39e3cfb8ee24a2479df05ba6ceffd6f872b37dd Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 30 Dec 2020 17:45:44 +0800 Subject: [PATCH 1/9] [SPARK-33934][SQL] Support user defined script command wrapper like hive --- .../apache/spark/sql/internal/SQLConf.scala | 8 + .../BaseScriptTransformationExec.scala | 67 ++++++- .../SparkScriptTransformationExec.scala | 2 +- sql/core/src/test/resources/test_script.py | 2 + .../BaseScriptTransformationSuite.scala | 174 ++++++++++++++++++ .../HiveScriptTransformationExec.scala | 2 +- 6 files changed, 248 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6fcab887dd6af..ed4e36b51b8de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2918,6 +2918,14 @@ object SQLConf { .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) + val SCRIPT_TRANSFORMATION_COMMAND_WRAPPER = + buildConf("spark.sql.scriptTransformation.commandWrapper") + .internal() + .doc("Command wrapper for executor to execute transformation script.") + .version("3.2.0") + .stringConf + .createWithDefault("/bin/bash -c") + val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT = buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 1c87c48ae7cb3..8a6466bfae0f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.execution -import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream} +import java.io._ import java.nio.charset.StandardCharsets +import java.util.ArrayList import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -26,7 +27,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.{SparkException, SparkFiles, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} @@ -46,6 +47,8 @@ trait BaseScriptTransformationExec extends UnaryExecNode { def child: SparkPlan def ioschema: ScriptTransformationIOSchema + type ProcParameters = (OutputStream, Process, InputStream, CircularBuffer) + protected lazy val inputExpressionsWithoutSerde: Seq[Expression] = { input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone)) } @@ -55,8 +58,11 @@ trait BaseScriptTransformationExec extends UnaryExecNode { override def outputPartitioning: Partitioning = child.outputPartitioning override def doExecute(): RDD[InternalRow] = { + val hadoopConf = sqlContext.sessionState.newHadoopConf() + hadoopConf.set(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key, + conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER)) val broadcastedHadoopConf = - new SerializableConfiguration(sqlContext.sessionState.newHadoopConf()) + new SerializableConfiguration(hadoopConf) child.execute().mapPartitions { iter => if (iter.hasNext) { @@ -69,9 +75,11 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { - val cmd = List("/bin/bash", "-c", script) + protected def initProc(hadoopConf: Configuration): ProcParameters = { + val wrapper = splitArgs(hadoopConf.get(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key)) + val cmd = wrapper.toList ++ List(script) val builder = new ProcessBuilder(cmd.asJava) + .directory(new File(SparkFiles.getRootDirectory())) val proc = builder.start() val inputStream = proc.getInputStream @@ -181,6 +189,55 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } + def splitArgs(args: String): Array[String] = { + val OUTSIDE = 1 + val SINGLEQ = 2 + val DOUBLEQ = 3 + val argList = new ArrayList[String] + val ch = args.toCharArray + val clen = ch.length + var state = OUTSIDE + var argstart = 0 + var c = 0 + while (c <= clen) { + val last = c == clen + var lastState = state + var endToken = false + if (!last) { + if (ch(c) == '\'') { + if (state == OUTSIDE) { + state = SINGLEQ + } else if (state == SINGLEQ) { + state = OUTSIDE + } + endToken = state != lastState + } else if (ch(c) == '"') { + if (state == OUTSIDE) { + state = DOUBLEQ + } else if (state == DOUBLEQ) { + state = OUTSIDE + } + endToken = state != lastState + } else if (ch(c) == ' ') { + if (state == OUTSIDE) { + endToken = true + } + } + } + if (last || endToken) { + if (c == argstart) { + // unquoted space + } else { + argList.add(args.substring(argstart, c)) + } + argstart = c + 1 + lastState = state + } + c += 1 + } + argList.toArray(new Array[String](0)) + } + private lazy val outputFieldWriters: Seq[String => Any] = output.map { attr => val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) attr.dataType match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala index 75c91667012a3..77d8faa78ca53 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala @@ -48,7 +48,7 @@ case class SparkScriptTransformationExec( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] = { - val (outputStream, proc, inputStream, stderrBuffer) = initProc + val (outputStream, proc, inputStream, stderrBuffer) = initProc(hadoopConf) val outputProjection = new InterpretedProjection(inputExpressionsWithoutSerde, child.output) diff --git a/sql/core/src/test/resources/test_script.py b/sql/core/src/test/resources/test_script.py index 82ef7b38f0c1b..4c7d9b3c40837 100644 --- a/sql/core/src/test/resources/test_script.py +++ b/sql/core/src/test/resources/test_script.py @@ -1,3 +1,5 @@ +#!/usr/bin/python + # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index cf9ee1ef6db72..ec7f563c34e96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -470,6 +470,180 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU Row("3\u00014\u00015") :: Nil) } } + + test("SPARK-33934: Check default execute command wrapper") { + assume(TestUtils.testCommandAvailable("python")) + val scriptFilePath = copyAndGetResourceFile("test_script.py", ".py").getAbsoluteFile + withTempView("v") { + val df = Seq( + (1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), + (2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), + (3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + // test '/bin/bash -c python /path/to/script.py' + checkAnswer( + sql( + s""" + |SELECT + |TRANSFORM(a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | USING 'python $scriptFilePath' AS (a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + |FROM v + """.stripMargin), identity, df.select( + 'a.cast("string"), + 'b.cast("string"), + 'c.cast("string"), + 'd.cast("string"), + 'e.cast("string")).collect()) + + // test '/bin/bash -c /path/to/script.py' with script not executable + val e1 = intercept[TestFailedException] { + checkAnswer( + sql( + s""" + |SELECT + |TRANSFORM(a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | USING '$scriptFilePath' AS (a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + |FROM v + """.stripMargin), identity, df.select( + 'a.cast("string"), + 'b.cast("string"), + 'c.cast("string"), + 'd.cast("string"), + 'e.cast("string")).collect()) + }.getMessage + assert(e1.contains("Permission denied")) + + // test '/bin/bash -c /path/to/script.py' with script executable + scriptFilePath.setExecutable(true) + checkAnswer( + sql( + s""" + |SELECT + |TRANSFORM(a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | USING '$scriptFilePath' AS (a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + |FROM v + """.stripMargin), identity, df.select( + 'a.cast("string"), + 'b.cast("string"), + 'c.cast("string"), + 'd.cast("string"), + 'e.cast("string")).collect()) + + scriptFilePath.setExecutable(false) + sql(s"ADD FILE ${scriptFilePath.getAbsolutePath}") + + // test '/bin/bash -c script.py' + val e2 = intercept[TestFailedException] { + checkAnswer( + sql( + s""" + |SELECT TRANSFORM(a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | USING '${scriptFilePath.getName}' AS (a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + |FROM v + """.stripMargin), identity, df.select( + 'a.cast("string"), + 'b.cast("string"), + 'c.cast("string"), + 'd.cast("string"), + 'e.cast("string")).collect()) + }.getMessage() + assert(e2.contains("command not found")) + } + } + + test("SPARK-33934: Check execute command wrapper is empty") { + assume(TestUtils.testCommandAvailable("python")) + val scriptFilePath = copyAndGetResourceFile( + "test_script.py", "_test_empty.py").getAbsoluteFile + withTempView("v") { + withSQLConf(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key -> "") { + val df = Seq( + (1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), + (2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), + (3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + scriptFilePath.setExecutable(true) + sql(s"ADD FILE ${scriptFilePath.getAbsolutePath}") + sql( + s""" + |SELECT TRANSFORM(a) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | USING 'pwd' AS (a) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '&' + |FROM (SELECT 1 AS a) TEMP + """.stripMargin).show(false) + + sql( + s""" + |SELECT TRANSFORM(a) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | USING 'ls' AS (a) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '&' + |FROM (SELECT 1 AS a) TEMP + """.stripMargin).show(false) + + // test 'python script.py' + checkAnswer( + sql( + s""" + |SELECT TRANSFORM(a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | USING 'python ${scriptFilePath.getName}' AS (a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + |FROM v + """.stripMargin), identity, df.select( + 'a.cast("string"), + 'b.cast("string"), + 'c.cast("string"), + 'd.cast("string"), + 'e.cast("string")).collect()) + + // test 'script.py' + checkAnswer( + sql( + s""" + |SELECT TRANSFORM(a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + | USING '${scriptFilePath.getName}' AS (a, b, c, d, e) + | ROW FORMAT DELIMITED + | FIELDS TERMINATED BY '\t' + |FROM v + """.stripMargin), identity, df.select( + 'a.cast("string"), + 'b.cast("string"), + 'c.cast("string"), + 'd.cast("string"), + 'e.cast("string")).collect()) + } + } + } } case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala index 4b03cff5e8c8e..4a202980bad9e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala @@ -140,7 +140,7 @@ case class HiveScriptTransformationExec( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] = { - val (outputStream, proc, inputStream, stderrBuffer) = initProc + val (outputStream, proc, inputStream, stderrBuffer) = initProc(hadoopConf) val (inputSerde, inputSoi) = initInputSerDe(ioschema, input).getOrElse((null, null)) From 70b69a50e19b15ccf151ec336883aed7ea6a419b Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 30 Dec 2020 23:02:59 +0800 Subject: [PATCH 2/9] update --- .../apache/spark/sql/internal/SQLConf.scala | 2 +- .../BaseScriptTransformationExec.scala | 10 +- sql/core/src/test/resources/test_script.py | 2 +- .../BaseScriptTransformationSuite.scala | 113 ++++-------------- 4 files changed, 37 insertions(+), 90 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ed4e36b51b8de..5325036500d97 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2924,7 +2924,7 @@ object SQLConf { .doc("Command wrapper for executor to execute transformation script.") .version("3.2.0") .stringConf - .createWithDefault("/bin/bash -c") + .createWithDefault("") val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT = buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 8a6466bfae0f0..745f6c3530eff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -77,7 +77,15 @@ trait BaseScriptTransformationExec extends UnaryExecNode { protected def initProc(hadoopConf: Configuration): ProcParameters = { val wrapper = splitArgs(hadoopConf.get(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key)) - val cmd = wrapper.toList ++ List(script) + val cmdArgs = splitArgs(script) + val prog = cmdArgs(0) + if(!new File(prog).isAbsolute) { + val progFile = new File(SparkFiles.get(prog)) + if (progFile.exists()) { + cmdArgs(0) = progFile.getAbsolutePath + } + } + val cmd = wrapper.toList ++ cmdArgs.toList val builder = new ProcessBuilder(cmd.asJava) .directory(new File(SparkFiles.getRootDirectory())) diff --git a/sql/core/src/test/resources/test_script.py b/sql/core/src/test/resources/test_script.py index 4c7d9b3c40837..75b4f106d3a1a 100644 --- a/sql/core/src/test/resources/test_script.py +++ b/sql/core/src/test/resources/test_script.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#! /usr/bin/python # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index ec7f563c34e96..50e51d50545cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -482,7 +482,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) df.createTempView("v") - // test '/bin/bash -c python /path/to/script.py' + // test 'python /path/to/script.py' with local file checkAnswer( sql( s""" @@ -501,7 +501,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU 'd.cast("string"), 'e.cast("string")).collect()) - // test '/bin/bash -c /path/to/script.py' with script not executable + // test '/path/to/script.py' with script not executable val e1 = intercept[TestFailedException] { checkAnswer( sql( @@ -523,7 +523,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU }.getMessage assert(e1.contains("Permission denied")) - // test '/bin/bash -c /path/to/script.py' with script executable + // test `/path/to/script.py' with script executable scriptFilePath.setExecutable(true) checkAnswer( sql( @@ -546,102 +546,41 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU scriptFilePath.setExecutable(false) sql(s"ADD FILE ${scriptFilePath.getAbsolutePath}") - // test '/bin/bash -c script.py' - val e2 = intercept[TestFailedException] { - checkAnswer( - sql( - s""" - |SELECT TRANSFORM(a, b, c, d, e) - | ROW FORMAT DELIMITED - | FIELDS TERMINATED BY '\t' - | USING '${scriptFilePath.getName}' AS (a, b, c, d, e) - | ROW FORMAT DELIMITED - | FIELDS TERMINATED BY '\t' - |FROM v - """.stripMargin), identity, df.select( - 'a.cast("string"), - 'b.cast("string"), - 'c.cast("string"), - 'd.cast("string"), - 'e.cast("string")).collect()) - }.getMessage() - assert(e2.contains("command not found")) - } - } - - test("SPARK-33934: Check execute command wrapper is empty") { - assume(TestUtils.testCommandAvailable("python")) - val scriptFilePath = copyAndGetResourceFile( - "test_script.py", "_test_empty.py").getAbsoluteFile - withTempView("v") { - withSQLConf(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key -> "") { - val df = Seq( - (1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), - (2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), - (3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) - ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) - df.createTempView("v") - - scriptFilePath.setExecutable(true) - sql(s"ADD FILE ${scriptFilePath.getAbsolutePath}") + // test `script.py` when file added + checkAnswer( sql( s""" - |SELECT TRANSFORM(a) + |SELECT TRANSFORM(a, b, c, d, e) | ROW FORMAT DELIMITED | FIELDS TERMINATED BY '\t' - | USING 'pwd' AS (a) + | USING '${scriptFilePath.getName}' AS (a, b, c, d, e) | ROW FORMAT DELIMITED - | FIELDS TERMINATED BY '&' - |FROM (SELECT 1 AS a) TEMP - """.stripMargin).show(false) + | FIELDS TERMINATED BY '\t' + |FROM v + """.stripMargin), identity, df.select( + 'a.cast("string"), + 'b.cast("string"), + 'c.cast("string"), + 'd.cast("string"), + 'e.cast("string")).collect()) + // test `python script.py` when file added + checkAnswer( sql( s""" - |SELECT TRANSFORM(a) + |SELECT TRANSFORM(a, b, c, d, e) | ROW FORMAT DELIMITED | FIELDS TERMINATED BY '\t' - | USING 'ls' AS (a) + | USING 'python ${scriptFilePath.getName}' AS (a, b, c, d, e) | ROW FORMAT DELIMITED - | FIELDS TERMINATED BY '&' - |FROM (SELECT 1 AS a) TEMP - """.stripMargin).show(false) - - // test 'python script.py' - checkAnswer( - sql( - s""" - |SELECT TRANSFORM(a, b, c, d, e) - | ROW FORMAT DELIMITED - | FIELDS TERMINATED BY '\t' - | USING 'python ${scriptFilePath.getName}' AS (a, b, c, d, e) - | ROW FORMAT DELIMITED - | FIELDS TERMINATED BY '\t' - |FROM v - """.stripMargin), identity, df.select( - 'a.cast("string"), - 'b.cast("string"), - 'c.cast("string"), - 'd.cast("string"), - 'e.cast("string")).collect()) - - // test 'script.py' - checkAnswer( - sql( - s""" - |SELECT TRANSFORM(a, b, c, d, e) - | ROW FORMAT DELIMITED - | FIELDS TERMINATED BY '\t' - | USING '${scriptFilePath.getName}' AS (a, b, c, d, e) - | ROW FORMAT DELIMITED - | FIELDS TERMINATED BY '\t' - |FROM v + | FIELDS TERMINATED BY '\t' + |FROM v """.stripMargin), identity, df.select( - 'a.cast("string"), - 'b.cast("string"), - 'c.cast("string"), - 'd.cast("string"), - 'e.cast("string")).collect()) - } + 'a.cast("string"), + 'b.cast("string"), + 'c.cast("string"), + 'd.cast("string"), + 'e.cast("string")).collect()) } } } From 525b51e864deca17cc67ff16c9f3b46ed6cd83fc Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 30 Dec 2020 23:12:03 +0800 Subject: [PATCH 3/9] Fix UT --- .../spark/sql/execution/BaseScriptTransformationSuite.scala | 2 +- .../sql/hive/execution/HiveScriptTransformationSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index 50e51d50545cb..b230f6d7970eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -218,7 +218,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU ioschema = defaultIOSchema) SparkPlanTest.executePlan(plan, spark.sqlContext) } - assert(e.getMessage.contains("Subprocess exited with status")) + assert(e.getMessage.contains("No such file or directory")) assert(uncaughtExceptionHandler.exception.isEmpty) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala index 3892caa51eca9..b6cf98f92dffa 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala @@ -111,7 +111,7 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T ioschema = hiveIOSchema) SparkPlanTest.executePlan(plan, hiveContext) } - assert(e.getMessage.contains("Subprocess exited with status")) + assert(e.getMessage.contains("No such file or directory")) assert(uncaughtExceptionHandler.exception.isEmpty) } @@ -150,7 +150,7 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T ioschema = hiveIOSchema) SparkPlanTest.executePlan(plan, hiveContext) } - assert(e.getMessage.contains("Subprocess exited with status")) + assert(e.getMessage.contains("No such file or directory")) assert(uncaughtExceptionHandler.exception.isEmpty) } From 4268880ba428c2b4b668c4719b17a3b132a0d1ba Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 31 Dec 2020 16:15:34 +0800 Subject: [PATCH 4/9] Update SQLQuerySuite.scala --- .../apache/spark/sql/hive/execution/SQLQuerySuite.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3370695245fd0..ebb03fcdee39e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1032,7 +1032,12 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi val data = (1 to 100000).map { i => (i, i, i) } data.toDF("d1", "d2", "d3").createOrReplaceTempView("script_trans") assert(0 === - sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans").count()) + sql( + """ + |SELECT TRANSFORM (d1, d2, d3) + | USING '\'cat 1>&2\'' AS (a,b,c) + |FROM script_trans + """.stripMargin).count()) } } From 02955011218ad485a3ea4b7a165f247948289f14 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Thu, 31 Dec 2020 16:23:30 +0800 Subject: [PATCH 5/9] Update SQLQuerySuite.scala --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index ebb03fcdee39e..0294e3047f51b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1035,7 +1035,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi sql( """ |SELECT TRANSFORM (d1, d2, d3) - | USING '\'cat 1>&2\'' AS (a,b,c) + | USING '/bin/bash -c \'cat 1>&2\'' AS (a,b,c) |FROM script_trans """.stripMargin).count()) } From f40684488ff87aa72acc976acb651ed4091c2c41 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 3 Jan 2021 10:15:18 +0800 Subject: [PATCH 6/9] Update BaseScriptTransformationExec.scala --- .../BaseScriptTransformationExec.scala | 55 +++++++++++++++++-- 1 file changed, 50 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 745f6c3530eff..7db504bcad372 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -78,11 +78,13 @@ trait BaseScriptTransformationExec extends UnaryExecNode { protected def initProc(hadoopConf: Configuration): ProcParameters = { val wrapper = splitArgs(hadoopConf.get(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key)) val cmdArgs = splitArgs(script) - val prog = cmdArgs(0) - if(!new File(prog).isAbsolute) { - val progFile = new File(SparkFiles.get(prog)) - if (progFile.exists()) { - cmdArgs(0) = progFile.getAbsolutePath + val program = cmdArgs(0) + if (!new File(program).isAbsolute) { + val pathFinder = new PathFinder("PATH") + pathFinder.prependPathComponent(SparkFiles.getRootDirectory()) + val programFile = pathFinder.getAbsolutePath(program) + if (programFile.isDefined) { + cmdArgs(0) = programFile.get.getAbsolutePath } } val cmd = wrapper.toList ++ cmdArgs.toList @@ -429,3 +431,46 @@ object ScriptTransformationIOSchema { input.schemaLess) } } + +/** + * Maps a relative pathname to an absolute pathname using the PATH environment. + */ +class PathFinder(val envPath: String) { + private var pathEnv = System.getenv(envPath) + private var pathSep = File.pathSeparator + private var fileSep = File.separator + + /** + * Appends the specified component to the path list. + */ + def prependPathComponent(str: String): Unit = { + pathEnv = str + pathSep + pathEnv + } + + /** + * Returns the full path name of this file if it is listed in the path. + */ + def getAbsolutePath(filename: String): Option[File] = { + if (pathEnv == null || pathSep == null || fileSep == null) { + None + } else { + pathEnv.split(pathSep).map(entry => { + var file = new File(entry) + try { + if (file.isDirectory) { + file = new File(entry + fileSep + filename) + } + + if (file.isFile && file.canRead) { + file + } else { + null + } + } catch { + case NonFatal(f) => + null + } + }).find(Option(_).isDefined) + } + } +} From 6d305f572725414d084b356ea9f538bd7db3b416 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 3 Jan 2021 12:30:11 +0800 Subject: [PATCH 7/9] follow comment --- .../sql/execution/BaseScriptTransformationExec.scala | 9 ++++----- .../sql/execution/SparkScriptTransformationExec.scala | 2 +- .../hive/execution/HiveScriptTransformationExec.scala | 2 +- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 7db504bcad372..9e16a8e6028f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -58,11 +58,10 @@ trait BaseScriptTransformationExec extends UnaryExecNode { override def outputPartitioning: Partitioning = child.outputPartitioning override def doExecute(): RDD[InternalRow] = { - val hadoopConf = sqlContext.sessionState.newHadoopConf() - hadoopConf.set(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key, + sparkContext.setLocalProperty(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key, conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER)) val broadcastedHadoopConf = - new SerializableConfiguration(hadoopConf) + new SerializableConfiguration(sqlContext.sessionState.newHadoopConf()) child.execute().mapPartitions { iter => if (iter.hasNext) { @@ -75,8 +74,8 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - protected def initProc(hadoopConf: Configuration): ProcParameters = { - val wrapper = splitArgs(hadoopConf.get(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key)) + protected def initProc: ProcParameters = { + val wrapper = splitArgs(conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER)) val cmdArgs = splitArgs(script) val program = cmdArgs(0) if (!new File(program).isAbsolute) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala index 77d8faa78ca53..75c91667012a3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala @@ -48,7 +48,7 @@ case class SparkScriptTransformationExec( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] = { - val (outputStream, proc, inputStream, stderrBuffer) = initProc(hadoopConf) + val (outputStream, proc, inputStream, stderrBuffer) = initProc val outputProjection = new InterpretedProjection(inputExpressionsWithoutSerde, child.output) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala index 4a202980bad9e..4b03cff5e8c8e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala @@ -140,7 +140,7 @@ case class HiveScriptTransformationExec( inputIterator: Iterator[InternalRow], hadoopConf: Configuration): Iterator[InternalRow] = { - val (outputStream, proc, inputStream, stderrBuffer) = initProc(hadoopConf) + val (outputStream, proc, inputStream, stderrBuffer) = initProc val (inputSerde, inputSoi) = initInputSerDe(ioschema, input).getOrElse((null, null)) From e9e6984c000539a4e1e327b795995f844b9e70d3 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 4 Jan 2021 10:16:52 +0800 Subject: [PATCH 8/9] follow comment --- .../apache/spark/sql/internal/SQLConf.scala | 8 -- .../BaseScriptTransformationExec.scala | 115 +----------------- .../BaseScriptTransformationSuite.scala | 2 +- .../HiveScriptTransformationSuite.scala | 4 +- .../sql/hive/execution/SQLQuerySuite.scala | 7 +- 5 files changed, 9 insertions(+), 127 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5325036500d97..6fcab887dd6af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2918,14 +2918,6 @@ object SQLConf { .checkValues(LegacyBehaviorPolicy.values.map(_.toString)) .createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString) - val SCRIPT_TRANSFORMATION_COMMAND_WRAPPER = - buildConf("spark.sql.scriptTransformation.commandWrapper") - .internal() - .doc("Command wrapper for executor to execute transformation script.") - .version("3.2.0") - .stringConf - .createWithDefault("") - val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT = buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 9e16a8e6028f7..547f29bdcdbe1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import java.io._ import java.nio.charset.StandardCharsets -import java.util.ArrayList import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -47,8 +46,6 @@ trait BaseScriptTransformationExec extends UnaryExecNode { def child: SparkPlan def ioschema: ScriptTransformationIOSchema - type ProcParameters = (OutputStream, Process, InputStream, CircularBuffer) - protected lazy val inputExpressionsWithoutSerde: Seq[Expression] = { input.map(Cast(_, StringType).withTimeZone(conf.sessionLocalTimeZone)) } @@ -58,8 +55,6 @@ trait BaseScriptTransformationExec extends UnaryExecNode { override def outputPartitioning: Partitioning = child.outputPartitioning override def doExecute(): RDD[InternalRow] = { - sparkContext.setLocalProperty(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER.key, - conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER)) val broadcastedHadoopConf = new SerializableConfiguration(sqlContext.sessionState.newHadoopConf()) @@ -74,21 +69,13 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - protected def initProc: ProcParameters = { - val wrapper = splitArgs(conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_COMMAND_WRAPPER)) - val cmdArgs = splitArgs(script) - val program = cmdArgs(0) - if (!new File(program).isAbsolute) { - val pathFinder = new PathFinder("PATH") - pathFinder.prependPathComponent(SparkFiles.getRootDirectory()) - val programFile = pathFinder.getAbsolutePath(program) - if (programFile.isDefined) { - cmdArgs(0) = programFile.get.getAbsolutePath - } - } - val cmd = wrapper.toList ++ cmdArgs.toList + protected def initProc: (OutputStream, Process, InputStream, CircularBuffer) = { + val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd.asJava) .directory(new File(SparkFiles.getRootDirectory())) + val path = SparkFiles.getRootDirectory() + File.pathSeparator + + System.getenv("PATH") + builder.environment().put("PATH", path) val proc = builder.start() val inputStream = proc.getInputStream @@ -198,55 +185,6 @@ trait BaseScriptTransformationExec extends UnaryExecNode { } } - def splitArgs(args: String): Array[String] = { - val OUTSIDE = 1 - val SINGLEQ = 2 - val DOUBLEQ = 3 - val argList = new ArrayList[String] - val ch = args.toCharArray - val clen = ch.length - var state = OUTSIDE - var argstart = 0 - var c = 0 - while (c <= clen) { - val last = c == clen - var lastState = state - var endToken = false - if (!last) { - if (ch(c) == '\'') { - if (state == OUTSIDE) { - state = SINGLEQ - } else if (state == SINGLEQ) { - state = OUTSIDE - } - endToken = state != lastState - } else if (ch(c) == '"') { - if (state == OUTSIDE) { - state = DOUBLEQ - } else if (state == DOUBLEQ) { - state = OUTSIDE - } - endToken = state != lastState - } else if (ch(c) == ' ') { - if (state == OUTSIDE) { - endToken = true - } - } - } - if (last || endToken) { - if (c == argstart) { - // unquoted space - } else { - argList.add(args.substring(argstart, c)) - } - argstart = c + 1 - lastState = state - } - c += 1 - } - argList.toArray(new Array[String](0)) - } - private lazy val outputFieldWriters: Seq[String => Any] = output.map { attr => val converter = CatalystTypeConverters.createToCatalystConverter(attr.dataType) attr.dataType match { @@ -430,46 +368,3 @@ object ScriptTransformationIOSchema { input.schemaLess) } } - -/** - * Maps a relative pathname to an absolute pathname using the PATH environment. - */ -class PathFinder(val envPath: String) { - private var pathEnv = System.getenv(envPath) - private var pathSep = File.pathSeparator - private var fileSep = File.separator - - /** - * Appends the specified component to the path list. - */ - def prependPathComponent(str: String): Unit = { - pathEnv = str + pathSep + pathEnv - } - - /** - * Returns the full path name of this file if it is listed in the path. - */ - def getAbsolutePath(filename: String): Option[File] = { - if (pathEnv == null || pathSep == null || fileSep == null) { - None - } else { - pathEnv.split(pathSep).map(entry => { - var file = new File(entry) - try { - if (file.isDirectory) { - file = new File(entry + fileSep + filename) - } - - if (file.isFile && file.canRead) { - file - } else { - null - } - } catch { - case NonFatal(f) => - null - } - }).find(Option(_).isDefined) - } - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index b230f6d7970eb..50e51d50545cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -218,7 +218,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU ioschema = defaultIOSchema) SparkPlanTest.executePlan(plan, spark.sqlContext) } - assert(e.getMessage.contains("No such file or directory")) + assert(e.getMessage.contains("Subprocess exited with status")) assert(uncaughtExceptionHandler.exception.isEmpty) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala index b6cf98f92dffa..3892caa51eca9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala @@ -111,7 +111,7 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T ioschema = hiveIOSchema) SparkPlanTest.executePlan(plan, hiveContext) } - assert(e.getMessage.contains("No such file or directory")) + assert(e.getMessage.contains("Subprocess exited with status")) assert(uncaughtExceptionHandler.exception.isEmpty) } @@ -150,7 +150,7 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T ioschema = hiveIOSchema) SparkPlanTest.executePlan(plan, hiveContext) } - assert(e.getMessage.contains("No such file or directory")) + assert(e.getMessage.contains("Subprocess exited with status")) assert(uncaughtExceptionHandler.exception.isEmpty) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 0294e3047f51b..3370695245fd0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1032,12 +1032,7 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi val data = (1 to 100000).map { i => (i, i, i) } data.toDF("d1", "d2", "d3").createOrReplaceTempView("script_trans") assert(0 === - sql( - """ - |SELECT TRANSFORM (d1, d2, d3) - | USING '/bin/bash -c \'cat 1>&2\'' AS (a,b,c) - |FROM script_trans - """.stripMargin).count()) + sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans").count()) } } From 91542f9e526af6d9a1f486d7f28240aa4594ac21 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 4 Jan 2021 13:06:39 +0800 Subject: [PATCH 9/9] follow comment --- .../spark/sql/execution/BaseScriptTransformationExec.scala | 4 ++-- .../spark/sql/execution/BaseScriptTransformationSuite.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 547f29bdcdbe1..b66f94ae1107a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -73,8 +73,8 @@ trait BaseScriptTransformationExec extends UnaryExecNode { val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd.asJava) .directory(new File(SparkFiles.getRootDirectory())) - val path = SparkFiles.getRootDirectory() + File.pathSeparator + - System.getenv("PATH") + val path = System.getenv("PATH") + File.pathSeparator + + SparkFiles.getRootDirectory() builder.environment().put("PATH", path) val proc = builder.start() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index 50e51d50545cb..a25e4b8f8ea07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -471,7 +471,7 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU } } - test("SPARK-33934: Check default execute command wrapper") { + test("SPARK-33934: Add SparkFile's root dir to env property PATH") { assume(TestUtils.testCommandAvailable("python")) val scriptFilePath = copyAndGetResourceFile("test_script.py", ".py").getAbsoluteFile withTempView("v") {