Skip to content

Commit 0d028ce

Browse files
kiszklycplus
authored andcommitted
[SPARK-19372][SQL] Fix throwing a Java exception at df.fliter() due to 64KB bytecode size limit
## What changes were proposed in this pull request? When an expression for `df.filter()` has many nodes (e.g. 400), the size of Java bytecode for the generated Java code is more than 64KB. It produces an Java exception. As a result, the execution fails. This PR continues to execute by calling `Expression.eval()` disabling code generation if an exception has been caught. ## How was this patch tested? Add a test suite into `DataFrameSuite` Author: Kazuaki Ishizaki <[email protected]> Closes apache#17087 from kiszk/SPARK-19372.
1 parent a61e607 commit 0d028ce

7 files changed

Lines changed: 59 additions & 12 deletions

File tree

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ object ExternalCatalogUtils {
155155
})
156156

157157
inputPartitions.filter { p =>
158-
boundPredicate(p.toRow(partitionSchema, defaultTimeZoneId))
158+
boundPredicate.eval(p.toRow(partitionSchema, defaultTimeZoneId))
159159
}
160160
}
161161
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ import scala.language.existentials
2727
import scala.util.control.NonFatal
2828

2929
import com.google.common.cache.{CacheBuilder, CacheLoader}
30-
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler}
30+
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
31+
import org.apache.commons.lang3.exception.ExceptionUtils
32+
import org.codehaus.commons.compiler.CompileException
33+
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler}
3134
import org.codehaus.janino.util.ClassFile
3235

3336
import org.apache.spark.{SparkEnv, TaskContext, TaskKilledException}
@@ -899,8 +902,14 @@ object CodeGenerator extends Logging {
899902
/**
900903
* Compile the Java source code into a Java class, using Janino.
901904
*/
902-
def compile(code: CodeAndComment): GeneratedClass = {
905+
def compile(code: CodeAndComment): GeneratedClass = try {
903906
cache.get(code)
907+
} catch {
908+
// Cache.get() may wrap the original exception. See the following URL
909+
// http://google.github.io/guava/releases/14.0/api/docs/com/google/common/cache/
910+
// Cache.html#get(K,%20java.util.concurrent.Callable)
911+
case e @ (_: UncheckedExecutionException | _: ExecutionError) =>
912+
throw e.getCause
904913
}
905914

906915
/**
@@ -951,10 +960,14 @@ object CodeGenerator extends Logging {
951960
evaluator.cook("generated.java", code.body)
952961
recordCompilationStats(evaluator)
953962
} catch {
954-
case e: Exception =>
963+
case e: JaninoRuntimeException =>
955964
val msg = s"failed to compile: $e\n$formatted"
956965
logError(msg, e)
957-
throw new Exception(msg, e)
966+
throw new JaninoRuntimeException(msg, e)
967+
case e: CompileException =>
968+
val msg = s"failed to compile: $e\n$formatted"
969+
logError(msg, e)
970+
throw new CompileException(msg, e.getLocation)
958971
}
959972
evaluator.getClazz().newInstance().asInstanceOf[GeneratedClass]
960973
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,22 @@ package org.apache.spark.sql.catalyst.expressions
2020
import org.apache.spark.sql.catalyst.InternalRow
2121
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
2222
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
23+
import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => BasePredicate}
2324
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2425
import org.apache.spark.sql.catalyst.util.TypeUtils
2526
import org.apache.spark.sql.types._
2627

2728

2829
object InterpretedPredicate {
29-
def create(expression: Expression, inputSchema: Seq[Attribute]): (InternalRow => Boolean) =
30+
def create(expression: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate =
3031
create(BindReferences.bindReference(expression, inputSchema))
3132

32-
def create(expression: Expression): (InternalRow => Boolean) = {
33-
(r: InternalRow) => expression.eval(r).asInstanceOf[Boolean]
34-
}
33+
def create(expression: Expression): InterpretedPredicate = new InterpretedPredicate(expression)
3534
}
3635

36+
case class InterpretedPredicate(expression: Expression) extends BasePredicate {
37+
override def eval(r: InternalRow): Boolean = expression.eval(r).asInstanceOf[Boolean]
38+
}
3739

3840
/**
3941
* An [[Expression]] that returns a boolean value.

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
2222
import scala.collection.mutable.ArrayBuffer
2323
import scala.concurrent.ExecutionContext
2424

25+
import org.codehaus.commons.compiler.CompileException
26+
import org.codehaus.janino.JaninoRuntimeException
27+
2528
import org.apache.spark.{broadcast, SparkEnv}
2629
import org.apache.spark.internal.Logging
2730
import org.apache.spark.io.CompressionCodec
@@ -353,9 +356,27 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
353356
GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
354357
}
355358

359+
private def genInterpretedPredicate(
360+
expression: Expression, inputSchema: Seq[Attribute]): InterpretedPredicate = {
361+
val str = expression.toString
362+
val logMessage = if (str.length > 256) {
363+
str.substring(0, 256 - 3) + "..."
364+
} else {
365+
str
366+
}
367+
logWarning(s"Codegen disabled for this expression:\n $logMessage")
368+
InterpretedPredicate.create(expression, inputSchema)
369+
}
370+
356371
protected def newPredicate(
357372
expression: Expression, inputSchema: Seq[Attribute]): GenPredicate = {
358-
GeneratePredicate.generate(expression, inputSchema)
373+
try {
374+
GeneratePredicate.generate(expression, inputSchema)
375+
} catch {
376+
case e @ (_: JaninoRuntimeException | _: CompileException)
377+
if sqlContext == null || sqlContext.conf.wholeStageFallback =>
378+
genInterpretedPredicate(expression, inputSchema)
379+
}
359380
}
360381

361382
protected def newOrdering(

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ abstract class PartitioningAwareFileIndex(
177177
})
178178

179179
val selected = partitions.filter {
180-
case PartitionPath(values, _) => boundPredicate(values)
180+
case PartitionPath(values, _) => boundPredicate.eval(values)
181181
}
182182
logInfo {
183183
val total = partitions.length

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1844,4 +1844,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
18441844
.filter($"x1".isNotNull || !$"y".isin("a!"))
18451845
.count
18461846
}
1847+
1848+
test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") {
1849+
val N = 400
1850+
val rows = Seq(Row.fromSeq(Seq.fill(N)("string")))
1851+
val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType)))
1852+
val df = spark.createDataFrame(spark.sparkContext.makeRDD(rows), schema)
1853+
1854+
val filter = (0 until N)
1855+
.foldLeft(lit(false))((e, index) => e.or(df.col(df.columns(index)) =!= "string"))
1856+
df.filter(filter).count
1857+
}
18471858
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class SimpleTextSource extends TextBasedFileFormat with DataSourceRegister {
103103
// `Cast`ed values are always of internal types (e.g. UTF8String instead of String)
104104
Cast(Literal(value), dataType).eval()
105105
})
106-
}.filter(predicate).map(projection)
106+
}.filter(predicate.eval).map(projection)
107107

108108
// Appends partition values
109109
val fullOutput = requiredSchema.toAttributes ++ partitionSchema.toAttributes

0 commit comments

Comments
 (0)