Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -403,13 +403,14 @@ class CodegenContext {
* equivalentExpressions will match the tree containing `col1 + col2` and it will only
* be evaluated once.
*/
val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions
private val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions

// Foreach expression that is participating in subexpression elimination, the state to use.
var subExprEliminationExprs = Map.empty[Expression, SubExprEliminationState]
// Visible for testing.
private[expressions] var subExprEliminationExprs = Map.empty[Expression, SubExprEliminationState]

// The collection of sub-expression result resetting methods that need to be called on each row.
val subexprFunctions = mutable.ArrayBuffer.empty[String]
private val subexprFunctions = mutable.ArrayBuffer.empty[String]

val outerClassName = "OuterClass"

Expand Down Expand Up @@ -993,6 +994,14 @@ class CodegenContext {
}
}

/**
* Returns the code for subexpression elimination after splitting it if necessary.
*/
def subexprFunctionsCode: String = {
// Whole-stage codegen's subexpression elimination is handled in another code path
splitExpressions(subexprFunctions, "subexprFunc_split", Seq("InternalRow" -> INPUT_ROW))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to check if its empty?

if (subexprFunctions.nonEmpty) {
  splitExpressions(...
} else {
 ""
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary: if it is empty splitExpressions would return an empty string, anyway I can add it if you think it is more clear

}

/**
* Perform a function which generates a sequence of ExprCodes with a given mapping between
* expressions and common expressions, instead of using the mapping in current context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
}

// Evaluate all the subexpressions.
val evalSubexpr = ctx.subexprFunctions.mkString("\n")
val evalSubexpr = ctx.subexprFunctionsCode

val allProjections = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._1))
val allUpdates = ctx.splitExpressionsWithCurrentInputs(projectionCodes.map(_._2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
v => s"$v = new $rowWriterClass(${expressions.length}, ${numVarLenFields * 32});")

// Evaluate all the subexpression.
val evalSubexpr = ctx.subexprFunctions.mkString("\n")
val evalSubexpr = ctx.subexprFunctionsCode

val writeExpressions = writeExpressionsToBuffer(
ctx, ctx.INPUT_ROW, exprEvals, exprSchemas, rowWriter, isTopLevel = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2202,4 +2202,12 @@ class DataFrameSuite extends QueryTest with SharedSparkSession {
|*(1) Range (0, 10, step=1, splits=2)""".stripMargin))
}
}

test("SPARK-28916: subexrepssion elimination can cause 64kb code limit") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how long does this test run? We can write a unit test instead if the end-to-end test is too expensive to run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it takes about 3 mins. I'll try and find a way to create a UT...

val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as field_$i"): _*)
df.createOrReplaceTempView("spark64kb")
val data = spark.sql("select * from spark64kb limit 10")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a test for GenerateMutableProjection? How about the case GenerateUnsafeProjection?

// This fails if 64Kb limit is reached in code generation
data.describe()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we disable fallback to interpreter in this test?

Copy link
Contributor Author

@mgaido91 mgaido91 Sep 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to revisit it according to Wenchen's suggestion... unfortunately this might take some time as I am a bit busy these days...

}
}