Skip to content
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
05274e7
Decouple consume functions of physical operators in whole-stage codegen.
viirya Aug 13, 2017
e0e7a6e
shouldStop is called outside consume().
viirya Aug 13, 2017
413707d
Fix the condition and the case of using continue in consume.
viirya Aug 13, 2017
0bb8c0e
More comment.
viirya Aug 13, 2017
6d600d5
Fix aggregation.
viirya Aug 13, 2017
502139a
Also deal with sort case.
viirya Aug 13, 2017
5fe3762
Fix broadcasthash join.
viirya Aug 14, 2017
4bef567
Add more comments.
viirya Aug 14, 2017
1694c9b
Fix the cases where operators set up its produce framework.
viirya Aug 14, 2017
8f3b984
Fix Expand.
viirya Aug 14, 2017
c04da15
Fix BroadcastHashJoin.
viirya Aug 15, 2017
9540195
Rename variables.
viirya Aug 17, 2017
1101b2c
Don't create consume function if the number of arguments are more tha…
viirya Sep 1, 2017
ff77bfe
Merge remote-tracking branch 'upstream/master' into SPARK-21717
viirya Sep 26, 2017
e36ec3c
Remove the part of "continue" processing.
viirya Sep 26, 2017
edb73d6
Merge remote-tracking branch 'upstream/master' into SPARK-21717
viirya Oct 6, 2017
601c225
Fix test.
viirya Oct 7, 2017
476994f
More accurate calculation of valid method parameter length.
viirya Oct 11, 2017
bdc1146
Address comment.
viirya Oct 12, 2017
58eaf00
Address comments.
viirya Jan 24, 2018
2f2d1fd
Merge remote-tracking branch 'upstream/master' into SPARK-21717
viirya Jan 24, 2018
9f0d1da
Copy variables used for creating unsaferow.
viirya Jan 24, 2018
79d0106
Revert vairables copying.
viirya Jan 24, 2018
6384aec
Add final to constants.
viirya Jan 24, 2018
0c4173e
Address comments.
viirya Jan 25, 2018
c859d53
Add tests.
viirya Jan 25, 2018
11946e7
Refactor isValidParamLength a bit.
viirya Jan 25, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,24 @@ class CodegenContext {
""
}
}

/**
* In Java, a method descriptor is valid only if it represents method parameters with a total
* length of 255 or less. `this` contributes one unit and a parameter of type long or double
* contributes two units. Besides, for nullable parameters, we also need to pass a boolean
* for the null status.
*/
def isValidParamLength(params: Seq[Expression]): Boolean = {
def calculateParamLength(input: Expression): Int = {
// For a nullable expression, we need to pass in an extra boolean parameter.
(if (input.nullable) 1 else 0) + javaType(input.dataType) match {
case JAVA_LONG | JAVA_DOUBLE => 2
case _ => 1
}
}
// Initial value is 1 for `this`.
1 + params.map(calculateParamLength(_)).sum <= CodeGenerator.MAX_JVM_METHOD_PARAMS_LENGTH
}
}

/**
Expand Down Expand Up @@ -1311,26 +1329,29 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
object CodeGenerator extends Logging {

// This is the value of HugeMethodLimit in the OpenJDK JVM settings
val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000
final val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000

// The max valid length of method parameters in JVM.
final val MAX_JVM_METHOD_PARAMS_LENGTH = 255

// This is the threshold over which the methods in an inner class are grouped in a single
// method which is going to be called by the outer class instead of the many small ones
val MERGE_SPLIT_METHODS_THRESHOLD = 3
final val MERGE_SPLIT_METHODS_THRESHOLD = 3

// The number of named constants that can exist in the class is limited by the Constant Pool
// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
// threshold of 1000k bytes to determine when a function should be inlined to a private, inner
// class.
val GENERATED_CLASS_SIZE_THRESHOLD = 1000000
final val GENERATED_CLASS_SIZE_THRESHOLD = 1000000

// This is the threshold for the number of global variables, whose types are primitive type or
// complex type (e.g. more than one-dimensional array), that will be placed at the outer class
val OUTER_CLASS_VARIABLES_THRESHOLD = 10000
final val OUTER_CLASS_VARIABLES_THRESHOLD = 10000

// This is the maximum number of array elements to keep global variables in one Java array
// 32767 is the maximum integer value that does not require a constant pool entry in a Java
// bytecode instruction
val MUTABLESTATEARRAY_SIZE_LIMIT = 32768
final val MUTABLESTATEARRAY_SIZE_LIMIT = 32768

/**
* Compile the Java source code into a Java class, using Janino.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,14 @@ object SQLConf {
.intConf
.createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT)

val DECOUPLE_OPERATOR_CONSUME_FUNCTIONS = buildConf("spark.sql.codegen.decoupleOperatorConsume")
Copy link
Contributor

@cloud-fan cloud-fan Jan 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decoupleOperatorConsume looks weird, how about splitConsumeFuncByOperator?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DECOUPLE_OPERATOR_CONSUME_FUNCTIONS -> WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.

.internal()
.doc("When true, whole stage codegen would put the logic of consuming rows of each physical " +
"operator into individual methods, instead of a single big method. This can be used to " +
"avoid oversized function that can miss the opportunity of JIT optimization.")
.booleanConf
.createWithDefault(true)

val FILES_MAX_PARTITION_BYTES = buildConf("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
Expand Down Expand Up @@ -1263,6 +1271,8 @@ class SQLConf extends Serializable with Logging {

def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)

def decoupleOperatorConsumeFuncs: Boolean = getConf(DECOUPLE_OPERATOR_CONSUME_FUNCTIONS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the wholeStage prefix for such flag names.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Done.


def tableRelationCacheSize: Int =
getConf(StaticSQLConf.FILESOURCE_TABLE_RELATION_CACHE_SIZE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan {
ctx.INPUT_ROW = null
ctx.freshNamePrefix = parent.variablePrefix
val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)

// Under certain conditions, we can put the logic to consume the rows of this operator into
Copy link
Member

@kiszk kiszk Aug 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate certain conditions in the comment if you have time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comment to elaborate the idea.

// another function. So we can prevent a generated function too long to be optimized by JIT.
// The conditions:
// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled.
// 2. The parent uses all variables in output. we can't defer variable evaluation when consume
// in another function.
// 3. The output variables are not empty. If it's empty, we don't bother to do that.
Copy link
Contributor

@cloud-fan cloud-fan Jan 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this? Logically an operator can still have complex consume method even if it doesn't have output.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds correct to me, logically, although I have no clear idea about the actual operator can be.

// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we need is inputVars are all materialized, which can be guaranteed by requireAllOutput and outputVars != null

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me outputVars != null isn't necessary too. When it is null, row can't be null. inputVars will bind on row columns and be evaluated before calling created method.

// can't do it.
// 5. The number of output variables must less than maximum number of parameters in Java method
// declaration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern is if we have a bunch of simple operators and we create a lot of small methods here. Maybe it's fine as optimizer would prevent such cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can be super safe and only do this for certain operators, like HashAggregateExec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or introduce a config so that users can turn it off.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a config for it so we can turn it off.

val requireAllOutput = output.forall(parent.usedInputs.contains(_))
val consumeFunc =
if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty &&
requireAllOutput && ctx.isValidParamLength(output)) {
constructDoConsumeFunction(ctx, inputVars)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always split consume functions?; we don't need to check if this consume function is too long?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to check it. But the whole-stage codegen is a non-breaking processing which produce/consume calls are embeded together. You don't have a break to check the function length here.

Actually I think it should have no negative effect to split consume functions always. From the benchmarking numbers, looks it shows no harm to normal queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should pass row to this function, if it's non-null, we can save a projection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should create a method for generating rowVar, so that we can use it in both consume and constructDoConsumeFunction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

} else {
parent.doConsume(ctx, inputVars, rowVar)
}
s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
|${parent.doConsume(ctx, inputVars, rowVar)}
|$consumeFunc
""".stripMargin
}

/**
* To prevent concatenated function growing too long to be optimized by JIT. We can separate the
* parent's `doConsume` codes of a `CodegenSupport` operator into a function to call.
*/
private def constructDoConsumeFunction(
ctx: CodegenContext,
inputVars: Seq[ExprCode]): String = {
val (callingParams, arguList, inputVarsInFunc) =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add fall back path to the original code gen (i.e. without creating consume function) if the number of arguments is more than 254 (255 - one for non-static method) (e.g. #19082).
If the total number of arguments is more than 255, janino will fail its compilation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I'll follow it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it's cleaner to return paramNames, paramTypes, paramVars, then we can simply do

void $doConsume(paramTypes.zip(paramNames).map(i => i._1 + " " + i._2).mkString(", "))

and

doConsumeFuncName(paramNames.mkString(", "))

inside constructConsumeParameters we can just create 3 mutable collections and go through variables to fill these collections.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds cleaner. I need to change it a little because the arguments and parameters are not the same. Some variables are not able parameterized, e.g., constants or statements.

constructConsumeParameters(ctx, output, inputVars)

// Set up rowVar because parent plan can possibly consume UnsafeRow instead of variables.
val colExprs = output.zipWithIndex.map { case (attr, i) =>
BoundReference(i, attr.dataType, attr.nullable)
}
// Don't need to copy the variables because they're already evaluated before entering function.
ctx.INPUT_ROW = null
ctx.currentVars = inputVarsInFunc
val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false)
val rowVar = ExprCode(ev.code.trim, "false", ev.value)

val doConsume = ctx.freshName("doConsume")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put the operator name in this function name?

Copy link
Member Author

@viirya viirya Jan 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The freshName here will add variablePrefix before doConsume. So it already has operator name, e.g., agg_doConsume.

ctx.currentVars = inputVarsInFunc
ctx.INPUT_ROW = null
val doConsumeFuncName = ctx.addNewFunction(doConsume,
s"""
| private void $doConsume($arguList) throws java.io.IOException {
| ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
| }
""".stripMargin)

s"""
| $doConsumeFuncName($callingParams);
""".stripMargin
}

/**
* Returns source code for calling consume function and the argument list of the consume function
* and also the `ExprCode` for the argument list.
*/
private def constructConsumeParameters(
ctx: CodegenContext,
attributes: Seq[Attribute],
variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
val params = variables.zipWithIndex.map { case (ev, i) =>
val arguName = ctx.freshName(s"expr_$i")
val arguType = ctx.javaType(attributes(i).dataType)

val (callingParam, funcParams, arguIsNull) = if (!attributes(i).nullable) {
// When the argument is not nullable, we don't need to pass in `isNull` param for it and
// simply give a `false`.
val arguIsNull = "false"
(ev.value, s"$arguType $arguName", arguIsNull)
} else {
val arguIsNull = ctx.freshName(s"exprIsNull_$i")
(ev.value + ", " + ev.isNull, s"$arguType $arguName, boolean $arguIsNull", arguIsNull)
}
(callingParam, funcParams, ExprCode("", arguIsNull, arguName))
}.unzip3
(params._1.mkString(", "), params._2.mkString(", "), params._3)
}

/**
* Returns source code to evaluate all the variables, and clear the code of them, to prevent
* them to be evaluated twice.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext {
val codeWithShortFunctions = genGroupByCode(3)
val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
val codeWithLongFunctions = genGroupByCode(20)
val codeWithLongFunctions = genGroupByCode(50)
Copy link
Member Author

@viirya viirya Oct 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We reduced the length of generated codes. So to make this test work, we increase the number of expressions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my pr, I changed the code to just check if long functions have the larger value of max code size:
https://github.com/apache/spark/pull/19082/files#diff-0314224342bb8c30143ab784b3805d19R185
but, just increasing the value seems better.

val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
}
Expand Down