Skip to content

Commit 9f3a1cd

Browse files
committed
fix.
1 parent 522e1f8 commit 9f3a1cd

7 files changed

Lines changed: 23 additions & 8 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -551,9 +551,9 @@ object SQLConf {
551551
.intConf
552552
.createWithDefault(100)
553553

554-
val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback")
554+
val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback")
555555
.internal()
556-
.doc("When true, whole stage codegen could be temporary disabled for the part of query that" +
556+
.doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" +
557557
" fail to compile generated code")
558558
.booleanConf
559559
.createWithDefault(true)
@@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging {
10411041

10421042
def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
10431043

1044-
def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
1044+
def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK)
10451045

10461046
def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
10471047

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
5454
@transient
5555
final val sqlContext = SparkSession.getActiveSession.map(_.sqlContext).orNull
5656

57+
// whether we should fallback when hitting compilation errors caused by codegen
58+
private val codeGenFallBack = sqlContext == null || sqlContext.conf.codegenFallback
59+
5760
protected def sparkContext = sqlContext.sparkContext
5861

5962
// sqlContext will be null when we are being deserialized on the slaves. In this instance
@@ -370,8 +373,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
370373
try {
371374
GeneratePredicate.generate(expression, inputSchema)
372375
} catch {
373-
case e @ (_: JaninoRuntimeException | _: CompileException)
374-
if sqlContext == null || sqlContext.conf.wholeStageFallback =>
376+
case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack =>
375377
genInterpretedPredicate(expression, inputSchema)
376378
}
377379
}

sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
382382
try {
383383
CodeGenerator.compile(cleanedSource)
384384
} catch {
385-
case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback =>
385+
case _: Exception if !Utils.isTesting && sqlContext.conf.codegenFallback =>
386386
// We should already saw the error message
387387
logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
388388
return child.execute()

sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
422422
v
423423
}
424424
withSQLConf(
425-
(SQLConf.WHOLESTAGE_FALLBACK.key, codegenFallback.toString),
425+
(SQLConf.CODEGEN_FALLBACK.key, codegenFallback.toString),
426426
(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStage.toString)) {
427427
val df = spark.range(0, 4, 1, 4).withColumn("c", c)
428428
val rows = df.collect()

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2011,7 +2011,17 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
20112011

20122012
val filter = (0 until N)
20132013
.foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string"))
2014-
df.filter(filter).count
2014+
2015+
withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "true") {
2016+
df.filter(filter).count()
2017+
}
2018+
2019+
withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") {
2020+
val e = intercept[SparkException] {
2021+
df.filter(filter).count()
2022+
}.getMessage
2023+
assert(e.contains("grows beyond 64 KB"))
2024+
}
20152025
}
20162026

20172027
test("SPARK-20897: cached self-join should not fail") {

sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.scalatest.concurrent.Eventually
2424

2525
import org.apache.spark.{DebugFilesystem, SparkConf}
2626
import org.apache.spark.sql.{SparkSession, SQLContext}
27+
import org.apache.spark.sql.internal.SQLConf
2728

2829
/**
2930
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
@@ -34,6 +35,7 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach with Eventua
3435
new SparkConf()
3536
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
3637
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
38+
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
3739
}
3840

3941
/**

sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ object TestHive
5151
"TestSQLContext",
5252
new SparkConf()
5353
.set("spark.sql.test", "")
54+
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
5455
.set("spark.sql.hive.metastore.barrierPrefixes",
5556
"org.apache.spark.sql.hive.execution.PairSerDe")
5657
.set("spark.sql.warehouse.dir", TestHiveContext.makeWarehouseDir().toURI.getPath)

0 commit comments

Comments
 (0)