Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -120,7 +121,8 @@ abstract class Expression extends TreeNode[Expression] {

private def reduceCodeSize(ctx: CodegenContext, eval: ExprCode): Unit = {
// TODO: support whole stage codegen too
if (eval.code.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) {
val splitThreshold = SQLConf.get.methodSplitThreshold
if (eval.code.length > splitThreshold && ctx.INPUT_ROW != null && ctx.currentVars == null) {
val setIsNull = if (!eval.isNull.isInstanceOf[LiteralValue]) {
val globalIsNull = ctx.addMutableState(CodeGenerator.JAVA_BOOLEAN, "globalIsNull")
val localIsNull = eval.isNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,12 +910,13 @@ class CodegenContext {
val blocks = new ArrayBuffer[String]()
val blockBuilder = new StringBuilder()
var length = 0
val splitThreshold = SQLConf.get.methodSplitThreshold
for (code <- expressions) {
// We can't know how many bytecode will be generated, so use the length of source code
// as metric. A method should not go beyond 8K, otherwise it will not be JITted, should
// also not be too small, or it will have many function calls (for wide table), see the
// results in BenchmarkWideTable.
if (length > 1024) {
if (length > splitThreshold) {
blocks += blockBuilder.toString()
blockBuilder.clear()
length = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,18 @@ object SQLConf {
.intConf
.createWithDefault(65535)

val CODEGEN_METHOD_SPLIT_THRESHOLD = buildConf("spark.sql.codegen.methodSplitThreshold")
.internal()
.doc("The threshold of source-code splitting in the codegen. When the number of characters " +
"in a single Java function (without comment) exceeds the threshold, the function will be " +
"automatically split to multiple smaller ones. We cannot know how many bytecode will be " +
"generated, so use the code length as metric. When running on HotSpot, a function's " +
"bytecode should not go beyond 8KB, otherwise it will not be JITted; it also should not " +
"be too small, otherwise there will be many function calls.")
.intConf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the description, it seems that we had better have checkValue here. Could you recommend the reasonable min/max values, @kiszk ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be more accurately, I think I should add When running on HotSpot, a function's bytecode should not go beyond 8KB....

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. That could be max.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1000 is conservative. But, there is no recommendation since the bytecode size depends on the content (e.g. 0's byte code length is 1. 9's byte code lengh is 2).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kiszk agree, 1000 might be not the best, see my benchmark for the wide table, 2048 is better.

================================================================================================
projection on wide table
================================================================================================
 Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6
Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
projection on wide table:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
split threshold 10                            8464 / 8737          0.1        8072.0       1.0X
split threshold 100                           5959 / 6251          0.2        5683.4       1.4X
split threshold 1024                          3202 / 3248          0.3        3053.2       2.6X
split threshold 2048                          3009 / 3097          0.3        2869.2       2.8X
split threshold 4096                          3414 / 3458          0.3        3256.1       2.5X
split threshold 8196                          4095 / 4112          0.3        3905.5       2.1X
split threshold 65536                       28800 / 29705          0.0       27465.8       0.3X

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try some very long alias names or complex expressions? You will get different number, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like long alias names have no influence.

[info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_162-b12 on Mac OS X 10.13.6
[info] Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz
[info] projection on wide table:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------
[info] split threshold 10                            6512 / 6736          0.2        6210.4       1.0X
[info] split threshold 100                           5730 / 6329          0.2        5464.9       1.1X
[info] split threshold 1024                          3119 / 3184          0.3        2974.6       2.1X
[info] split threshold 2048                          2981 / 3100          0.4        2842.9       2.2X
[info] split threshold 4096                          3289 / 3379          0.3        3136.6       2.0X
[info] split threshold 8196                          4307 / 4338          0.2        4108.0       1.5X
[info] split threshold 65536                       29147 / 30212          0.0       27797.0       0.2X

No averylongprefixrepeatedmultipletimes in the expression code gen:

/* 047 */   private void createExternalRow_0_8(InternalRow i, Object[] values_0) {
/* 048 */
/* 049 */     // input[80, bigint, false]
/* 050 */     long value_81 = i.getLong(80);
/* 051 */     if (false) {
/* 052 */       values_0[80] = null;
/* 053 */     } else {
/* 054 */       values_0[80] = value_81;
/* 055 */     }
/* 056 */
/* 057 */     // input[81, bigint, false]
/* 058 */     long value_82 = i.getLong(81);
/* 059 */     if (false) {
/* 060 */       values_0[81] = null;
/* 061 */     } else {
/* 062 */       values_0[81] = value_82;
/* 063 */     }
/* 064 */
/* 065 */     // input[82, bigint, false]
/* 066 */     long value_83 = i.getLong(82);
/* 067 */     if (false) {
/* 068 */       values_0[82] = null;
/* 069 */     } else {
/* 070 */       values_0[82] = value_83;
/* 071 */     }
/* 072 */
...

My benchmark:

object WideTableBenchmark extends SqlBasedBenchmark {

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    runBenchmark("projection on wide table") {
      val N = 1 << 20
      val df = spark.range(N)
      val columns = (0 until 400).map{ i => s"id as averylongprefixrepeatedmultipletimes_id$i"}
      val benchmark = new Benchmark("projection on wide table", N, output = output)
      Seq("10", "100", "1024", "2048", "4096", "8196", "65536").foreach { n =>
        benchmark.addCase(s"split threshold $n", numIters = 5) { iter =>
          withSQLConf("spark.testing.codegen.splitThreshold" -> n) {
            df.selectExpr(columns: _*).foreach(identity(_))
          }
        }
      }
      benchmark.run()
    }
  }
}

Will keep benchmarking for the complex expression.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "freshNamePrefix" prefix is only applied in whole-stage codegen,
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L87
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L169

It doesn't take any effect in non-whole-stage codegen.

If you intend to stress test expression codegen but don't see the prefix being prepended, you're probably not adding it in the right place. Where did you add it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, you're using the column name...that's not the right place to put the "prefix". Column names are almost never carried over to the generated code in the current framework (the only exception is the lambda variable name).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rednaxelafx the wide table benchmark I used has 400 columns, whole stage codegen is disabled by default.

.checkValue(threshold => threshold > 0, "The threshold must be a positive integer.")
.createWithDefault(1024)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a check value to make sure the value is positive. We can figure out a lower and upper bound later.


val WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR =
buildConf("spark.sql.codegen.splitConsumeFuncByOperator")
.internal()
Expand Down Expand Up @@ -1733,6 +1745,8 @@ class SQLConf extends Serializable with Logging {

def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT)

def methodSplitThreshold: Int = getConf(CODEGEN_METHOD_SPLIT_THRESHOLD)

def wholeStageSplitConsumeFuncByOperator: Boolean =
getConf(WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR)

Expand Down