-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27684][SQL] Avoid conversion overhead for primitive types #24636
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
37ced27
ad57acf
fead323
74e70f5
010b3d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1003,22 +1003,38 @@ case class ScalaUDF( | |
| // such as IntegerType, its javaType is `int` and the returned type of user-defined | ||
| // function is Object. Trying to convert an Object to `int` will cause casting exception. | ||
| val evalCode = evals.map(_.code).mkString("\n") | ||
| val (funcArgs, initArgs) = evals.zipWithIndex.map { case (eval, i) => | ||
| val argTerm = ctx.freshName("arg") | ||
| val convert = s"$convertersTerm[$i].apply(${eval.value})" | ||
| val initArg = s"Object $argTerm = ${eval.isNull} ? null : $convert;" | ||
| (argTerm, initArg) | ||
| val (funcArgs, initArgs) = evals.zipWithIndex.zip(children.map(_.dataType)).map { | ||
| case ((eval, i), dt) => | ||
| val argTerm = ctx.freshName("arg") | ||
| val initArg = if (CatalystTypeConverters.isPrimitive(dt)) { | ||
| val convertedTerm = ctx.freshName("conv") | ||
| s""" | ||
| |${CodeGenerator.boxedType(dt)} $convertedTerm = ${eval.value}; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity, why do we need this extra and avoid the use of an extra variable name? Or if you want more typechecking, do and used the boxed type as To avoid repetition and more tightly scope the conditional part of the argument convert logic, we even might consider something like this val boxedType = CodeGenerator.boxedType(dt)
val maybeConverted = if (CatalystTypeConverters.isPrimitive(dt)) {
eval.value
} else {
"$convertersTerm[$i].apply(${eval.value})"
}
s"$boxedType $argTerm = ${eval.isNull} ? null : $maybeConverted;"
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, actually my first trial was exactly what you are suggesting here, but it didn't work: indeed it can cause compilation error (the error message is something like but this fails too with a confusing error message. Honestly, I am not sure why this 2nd solution doesn't work, since I tried taking the code and compiling it with jdk and it worked. My best guess is that it is a janino bug which doesn't support it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting! I've filed a bug against Janino to report this issue: janino-compiler/janino#90 |
||
| |Object $argTerm = ${eval.isNull} ? null : $convertedTerm; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we remove
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO maybe we can do this in a separate PR? I wouldn't be surprised if there's other places in Spark (beyond this method / file) where we could apply similar fixes (and if we're going to apply this in a lot of places then it might even be nice to write some sort of helper for generating / managing null checks. Does this PR look good to you otherwise?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @JoshRosen . It would be better to define a generic approach for addressing this and there are several instances of it. I can create another JIRA and start a PR for that if you're ok with it. |
||
| """.stripMargin | ||
| } else { | ||
| s"Object $argTerm = ${eval.isNull} ? null : $convertersTerm[$i].apply(${eval.value});" | ||
| } | ||
| (argTerm, initArg) | ||
| }.unzip | ||
|
|
||
| val udf = ctx.addReferenceObj("udf", function, s"scala.Function${children.length}") | ||
| val getFuncResult = s"$udf.apply(${funcArgs.mkString(", ")})" | ||
| val resultConverter = s"$convertersTerm[${children.length}]" | ||
| val boxedType = CodeGenerator.boxedType(dataType) | ||
|
|
||
| val funcInvokation = if (CatalystTypeConverters.isPrimitive(dataType) | ||
| // If the output is nullable, the returned value must be unwrapped from the Option | ||
| && !nullable) { | ||
| s"$resultTerm = ($boxedType)$getFuncResult" | ||
| } else { | ||
| s"$resultTerm = ($boxedType)$resultConverter.apply($getFuncResult)" | ||
| } | ||
| val callFunc = | ||
| s""" | ||
| |$boxedType $resultTerm = null; | ||
| |try { | ||
| | $resultTerm = ($boxedType)$resultConverter.apply($getFuncResult); | ||
| | $funcInvokation; | ||
| |} catch (Exception e) { | ||
| | throw new org.apache.spark.SparkException($errorMsgTerm, e); | ||
| |} | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| ================================================================================================ | ||
| UDF with mixed input types | ||
| ================================================================================================ | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| long/nullable int/string to string: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| long/nullable int/string to string wholestage off 250 285 50 0,4 2500,0 1,0X | ||
| long/nullable int/string to string wholestage on 148 174 34 0,7 1479,1 1,7X | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| long/nullable int/string to option: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| long/nullable int/string to option wholestage off 95 97 3 1,0 952,8 1,0X | ||
| long/nullable int/string to option wholestage on 71 76 4 1,4 708,2 1,3X | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| long/nullable int to primitive: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| long/nullable int to primitive wholestage off 73 75 4 1,4 725,2 1,0X | ||
| long/nullable int to primitive wholestage on 52 62 7 1,9 523,8 1,4X | ||
|
|
||
|
|
||
| ================================================================================================ | ||
| UDF with primitive types | ||
| ================================================================================================ | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| long/nullable int to string: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| long/nullable int to string wholestage off 65 68 5 1,5 649,0 1,0X | ||
| long/nullable int to string wholestage on 49 62 16 2,0 488,6 1,3X | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| long/nullable int to option: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| long/nullable int to option wholestage off 27 31 6 3,8 266,5 1,0X | ||
| long/nullable int to option wholestage on 28 40 10 3,6 278,5 1,0X | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz | ||
| long/nullable int/string to primitive: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------------------------------ | ||
| long/nullable int/string to primitive wholestage off 28 31 4 3,6 280,3 1,0X | ||
| long/nullable int/string to primitive wholestage on 29 31 3 3,5 285,7 1,0X | ||
|
|
||
|
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,104 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.benchmark | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.Literal | ||
| import org.apache.spark.sql.expressions.UserDefinedFunction | ||
| import org.apache.spark.sql.functions._ | ||
| import org.apache.spark.sql.types.{IntegerType, StringType} | ||
|
|
||
| /** | ||
| * Synthetic benchmark for Scala User Defined Functions. | ||
| * To run this benchmark: | ||
| * {{{ | ||
| * 1. without sbt: | ||
| * bin/spark-submit --class <this class> --jars <spark core test jar> <sql core test jar> | ||
| * 2. build/sbt "sql/test:runMain <this class>" | ||
| * 3. generate result: | ||
| * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" | ||
| * Results will be written to "benchmarks/UDFBenchmark-results.txt". | ||
| * }}} | ||
| */ | ||
| object UDFBenchmark extends SqlBasedBenchmark { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks @mgaido91 for your work on this performance improvement. I'm curious if you tried the JIRA test case from @JoshRosen with your changes. How close does this get us? Also do you think it might be worthwhile to add that test in this benchmark suite as well.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, everything can be added. If you think it is critical, we can add it. Indeed, the test reported in the description is very similar, as it is doing a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Anyway I added it, as shown in the results the overhead is now ~ 20% instead of ~50% |
||
|
|
||
| private def doRunBenchmarkWithMixedTypes(udf: UserDefinedFunction, cardinality: Int): Unit = { | ||
| val idCol = col("id") | ||
| val nullableIntCol = when( | ||
| idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType)) | ||
| val stringCol = idCol.cast(StringType) | ||
| spark.range(cardinality).select( | ||
| udf(idCol, nullableIntCol, stringCol)).write.format("noop").save() | ||
| } | ||
|
|
||
| private def doRunBenchmarkWithPrimitiveTypes( | ||
| udf: UserDefinedFunction, cardinality: Int): Unit = { | ||
| val idCol = col("id") | ||
| val nullableIntCol = when( | ||
| idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType)) | ||
| spark.range(cardinality).select(udf(idCol, nullableIntCol)).write.format("noop").save() | ||
| } | ||
|
|
||
| override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { | ||
| val cardinality = 100000 | ||
| runBenchmark("UDF with mixed input types") { | ||
| codegenBenchmark("long/nullable int/string to string", cardinality) { | ||
| val sampleUDF = udf {(a: Long, b: java.lang.Integer, c: String) => | ||
| s"$a,$b,$c" | ||
| } | ||
| doRunBenchmarkWithMixedTypes(sampleUDF, cardinality) | ||
| } | ||
|
|
||
| codegenBenchmark("long/nullable int/string to option", cardinality) { | ||
| val sampleUDF = udf {(_: Long, b: java.lang.Integer, _: String) => | ||
| Option(b) | ||
| } | ||
| doRunBenchmarkWithMixedTypes(sampleUDF, cardinality) | ||
| } | ||
|
|
||
| codegenBenchmark("long/nullable int to primitive", cardinality) { | ||
|
||
| val sampleUDF = udf {(a: Long, b: java.lang.Integer, _: String) => | ||
| Option(b).map(_.longValue()).getOrElse(a) | ||
| } | ||
| doRunBenchmarkWithMixedTypes(sampleUDF, cardinality) | ||
| } | ||
| } | ||
|
|
||
| runBenchmark("UDF with primitive types") { | ||
| codegenBenchmark("long/nullable int to string", cardinality) { | ||
| val sampleUDF = udf {(a: Long, b: java.lang.Integer) => | ||
| s"$a,$b" | ||
| } | ||
| doRunBenchmarkWithPrimitiveTypes(sampleUDF, cardinality) | ||
| } | ||
|
|
||
| codegenBenchmark("long/nullable int to option", cardinality) { | ||
| val sampleUDF = udf {(_: Long, b: java.lang.Integer) => | ||
| Option(b) | ||
| } | ||
| doRunBenchmarkWithPrimitiveTypes(sampleUDF, cardinality) | ||
| } | ||
|
|
||
| codegenBenchmark("long/nullable int/string to primitive", cardinality) { | ||
|
||
| val sampleUDF = udf {(a: Long, b: java.lang.Integer) => | ||
| Option(b).map(_.longValue()).getOrElse(a) | ||
| } | ||
| doRunBenchmarkWithPrimitiveTypes(sampleUDF, cardinality) | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use
CodeGenerator.isPrimitiveType? We can save the change toCatalystTypeConverters.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't work, you can see the UT failures. For types like timestamp it is not the same.