Skip to content
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
05274e7
Decouple consume functions of physical operators in whole-stage codegen.
viirya Aug 13, 2017
e0e7a6e
shouldStop is called outside consume().
viirya Aug 13, 2017
413707d
Fix the condition and the case of using continue in consume.
viirya Aug 13, 2017
0bb8c0e
More comment.
viirya Aug 13, 2017
6d600d5
Fix aggregation.
viirya Aug 13, 2017
502139a
Also deal with sort case.
viirya Aug 13, 2017
5fe3762
Fix broadcasthash join.
viirya Aug 14, 2017
4bef567
Add more comments.
viirya Aug 14, 2017
1694c9b
Fix the cases where operators set up its produce framework.
viirya Aug 14, 2017
8f3b984
Fix Expand.
viirya Aug 14, 2017
c04da15
Fix BroadcastHashJoin.
viirya Aug 15, 2017
9540195
Rename variables.
viirya Aug 17, 2017
1101b2c
Don't create consume function if the number of arguments are more tha…
viirya Sep 1, 2017
ff77bfe
Merge remote-tracking branch 'upstream/master' into SPARK-21717
viirya Sep 26, 2017
e36ec3c
Remove the part of "continue" processing.
viirya Sep 26, 2017
edb73d6
Merge remote-tracking branch 'upstream/master' into SPARK-21717
viirya Oct 6, 2017
601c225
Fix test.
viirya Oct 7, 2017
476994f
More accurate calculation of valid method parameter length.
viirya Oct 11, 2017
bdc1146
Address comment.
viirya Oct 12, 2017
58eaf00
Address comments.
viirya Jan 24, 2018
2f2d1fd
Merge remote-tracking branch 'upstream/master' into SPARK-21717
viirya Jan 24, 2018
9f0d1da
Copy variables used for creating unsaferow.
viirya Jan 24, 2018
79d0106
Revert vairables copying.
viirya Jan 24, 2018
6384aec
Add final to constants.
viirya Jan 24, 2018
0c4173e
Address comments.
viirya Jan 25, 2018
c859d53
Add tests.
viirya Jan 25, 2018
11946e7
Refactor isValidParamLength a bit.
viirya Jan 25, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,101 @@ trait CodegenSupport extends SparkPlan {

ctx.freshNamePrefix = parent.variablePrefix
val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs)

// Under certain conditions, we can put the logic to consume the rows of this operator into
Copy link
Member

@kiszk kiszk Aug 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate certain conditions in the comment if you have time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more comment to elaborate the idea.

// another function. So we can prevent a generated function too long to be optimized by JIT.
// The conditions:
// 1. The parent uses all variables in output. we can't defer variable evaluation when consume
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the concrete example when this case prevents consume functions from being split?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g., a ProjectExec node doesn't necessarily evaluate all its output variables before continuing doConsume of its parent node. It can defer the evaluation until the variables are needed in the parent node's consume logic.

Once a variable's evaluation is deferred, and if we create a consume function, the variable will be evaluated in the function. But now the references of this variable is out of scope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the kind explanation!

// in another function.
// 2. The output variables are not empty. If it's empty, we don't bother to do that.
// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto; I want to know the concrete example, too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same reason as above. The variables used to evaluate the row can be out of scope because row construction is deferred until it is used actually.

// can't do it.
// 4. The number of output variables must less than maximum number of parameters in Java method
// declaration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern is if we have a bunch of simple operators and we create a lot of small methods here. Maybe it's fine as optimizer would prevent such cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can be super safe and only do this for certain operators, like HashAggregateExec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or introduce a config so that users can turn it off.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a config for it so we can turn it off.

val requireAllOutput = output.forall(parent.usedInputs.contains(_))
val consumeFunc =
if (row == null && outputVars.nonEmpty && requireAllOutput && isValidParamLength(ctx)) {
constructDoConsumeFunction(ctx, inputVars)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always split consume functions?; we don't need to check if this consume function is too long?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to check it. But the whole-stage codegen is a non-breaking processing which produce/consume calls are embeded together. You don't have a break to check the function length here.

Actually I think it should have no negative effect to split consume functions always. From the benchmarking numbers, looks it shows no harm to normal queries.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should pass row to this function, if it's non-null, we can save a projection.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should create a method for generating rowVar, so that we can use it in both consume and constructDoConsumeFunction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

} else {
parent.doConsume(ctx, inputVars, rowVar)
}
s"""
|${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
|$evaluated
|${parent.doConsume(ctx, inputVars, rowVar)}
|$consumeFunc
""".stripMargin
}

/**
* In Java, a method descriptor is valid only if it represents method parameters with a total
* length of 255 or less. `this` contributes one unit and a parameter of type long or double
* contributes two units. Besides, for nullable parameters, we also need to pass a boolean
* for the null status.
*/
private def isValidParamLength(ctx: CodegenContext): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put it into CodegenContext as a util function so that we can use it in other places in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Put it in CodegenContext.

var paramLength = 1 // for `this` parameter.
output.foreach { attr =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit: This could be written as a foldLeft and then you can eliminate the var)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Will follow it.

ctx.javaType(attr.dataType) match {
case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => paramLength += 2
case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => paramLength += 3
case _ if !attr.nullable => paramLength += 1
case _ => paramLength += 2
}
}
paramLength <= 255
}

/**
* To prevent concatenated function growing too long to be optimized by JIT. We can separate the
* parent's `doConsume` codes of a `CodegenSupport` operator into a function to call.
*/
private def constructDoConsumeFunction(
ctx: CodegenContext,
inputVars: Seq[ExprCode]): String = {
val (callingParams, arguList, inputVarsInFunc) =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add fall back path to the original code gen (i.e. without creating consume function) if the number of arguments is more than 254 (255 - one for non-static method) (e.g. #19082).
If the total number of arguments is more than 255, janino will fail its compilation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I'll follow it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it's cleaner to return paramNames, paramTypes, paramVars, then we can simply do

void $doConsume(paramTypes.zip(paramNames).map(i => i._1 + " " + i._2).mkString(", "))

and

doConsumeFuncName(paramNames.mkString(", "))

inside constructConsumeParameters we can just create 3 mutable collections and go through variables to fill these collections.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds cleaner. I need to change it a little because the arguments and parameters are not the same. Some variables are not able parameterized, e.g., constants or statements.

constructConsumeParameters(ctx, output, inputVars)
val rowVar = ExprCode("", "false", "unsafeRow")
val doConsume = ctx.freshName("doConsume")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we put the operator name in this function name?

Copy link
Member Author

@viirya viirya Jan 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The freshName here will add variablePrefix before doConsume. So it already has operator name, e.g., agg_doConsume.

val doConsumeFuncName = ctx.addNewFunction(doConsume,
s"""
| private void $doConsume($arguList) throws java.io.IOException {
| ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
| }
""".stripMargin)

s"""
| $doConsumeFuncName($callingParams);
""".stripMargin
}

/**
* Returns source code for calling consume function and the argument list of the consume function
* and also the `ExprCode` for the argument list.
*/
private def constructConsumeParameters(
ctx: CodegenContext,
attributes: Seq[Attribute],
variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
val params = variables.zipWithIndex.map { case (ev, i) =>
val arguName = ctx.freshName(s"expr_$i")
val arguType = ctx.javaType(attributes(i).dataType)

val (callingParam, funcParams, arguIsNull) = if (!attributes(i).nullable) {
// When the argument is not nullable, we don't need to pass in `isNull` param for it and
// simply give a `false`.
val arguIsNull = "false"
(ev.value, s"$arguType $arguName", arguIsNull)
} else {
val arguIsNull = ctx.freshName(s"exprIsNull_$i")
(ev.value + ", " + ev.isNull, s"$arguType $arguName, boolean $arguIsNull", arguIsNull)
}
(callingParam, funcParams, ExprCode("", arguIsNull, arguName))
}.unzip3
(params._1.mkString(", "),
params._2.mkString(", "),
params._3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the above 3 lines can be one line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

/**
* Returns source code to evaluate all the variables, and clear the code of them, to prevent
* them to be evaluated twice.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {
val codeWithShortFunctions = genGroupByCode(3)
val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions)
assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
val codeWithLongFunctions = genGroupByCode(20)
val codeWithLongFunctions = genGroupByCode(50)
Copy link
Member Author

@viirya viirya Oct 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We reduced the length of generated codes. So to make this test work, we increase the number of expressions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my pr, I changed the code to just check if long functions have the larger value of max code size:
https://github.com/apache/spark/pull/19082/files#diff-0314224342bb8c30143ab784b3805d19R185
but, just increasing the value seems better.

val (_, maxCodeSize2) = CodeGenerator.compile(codeWithLongFunctions)
assert(maxCodeSize2 > SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get)
}
Expand Down