diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index 925d12c16bae..6020b068155f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -42,7 +42,7 @@ object CatalystTypeConverters { // Since the map values can be mutable, we explicitly import scala.collection.Map at here. import scala.collection.Map - private def isPrimitive(dataType: DataType): Boolean = { + private[sql] def isPrimitive(dataType: DataType): Boolean = { dataType match { case BooleanType => true case ByteType => true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala index eb45f0806455..3274b66e9848 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala @@ -1003,22 +1003,38 @@ case class ScalaUDF( // such as IntegerType, its javaType is `int` and the returned type of user-defined // function is Object. Trying to convert an Object to `int` will cause casting exception. val evalCode = evals.map(_.code).mkString("\n") - val (funcArgs, initArgs) = evals.zipWithIndex.map { case (eval, i) => - val argTerm = ctx.freshName("arg") - val convert = s"$convertersTerm[$i].apply(${eval.value})" - val initArg = s"Object $argTerm = ${eval.isNull} ? null : $convert;" - (argTerm, initArg) + val (funcArgs, initArgs) = evals.zipWithIndex.zip(children.map(_.dataType)).map { + case ((eval, i), dt) => + val argTerm = ctx.freshName("arg") + val initArg = if (CatalystTypeConverters.isPrimitive(dt)) { + val convertedTerm = ctx.freshName("conv") + s""" + |${CodeGenerator.boxedType(dt)} $convertedTerm = ${eval.value}; + |Object $argTerm = ${eval.isNull} ? null : $convertedTerm; + """.stripMargin + } else { + s"Object $argTerm = ${eval.isNull} ? null : $convertersTerm[$i].apply(${eval.value});" + } + (argTerm, initArg) }.unzip val udf = ctx.addReferenceObj("udf", function, s"scala.Function${children.length}") val getFuncResult = s"$udf.apply(${funcArgs.mkString(", ")})" val resultConverter = s"$convertersTerm[${children.length}]" val boxedType = CodeGenerator.boxedType(dataType) + + val funcInvokation = if (CatalystTypeConverters.isPrimitive(dataType) + // If the output is nullable, the returned value must be unwrapped from the Option + && !nullable) { + s"$resultTerm = ($boxedType)$getFuncResult" + } else { + s"$resultTerm = ($boxedType)$resultConverter.apply($getFuncResult)" + } val callFunc = s""" |$boxedType $resultTerm = null; |try { - | $resultTerm = ($boxedType)$resultConverter.apply($getFuncResult); + | $funcInvokation; |} catch (Exception e) { | throw new org.apache.spark.SparkException($errorMsgTerm, e); |} diff --git a/sql/core/benchmarks/UDFBenchmark-results.txt b/sql/core/benchmarks/UDFBenchmark-results.txt new file mode 100644 index 000000000000..3dfd0c1caeb2 --- /dev/null +++ b/sql/core/benchmarks/UDFBenchmark-results.txt @@ -0,0 +1,59 @@ +================================================================================================ +UDF with mixed input types +================================================================================================ + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz +long/nullable int/string to string: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +long/nullable int/string to string wholestage off 194 248 76 0,5 1941,4 1,0X +long/nullable int/string to string wholestage on 127 136 8 0,8 1269,5 1,5X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz +long/nullable int/string to option: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +long/nullable int/string to option wholestage off 91 97 8 1,1 910,1 1,0X +long/nullable int/string to option wholestage on 60 79 29 1,7 603,8 1,5X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz +long/nullable int/string to primitive: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +long/nullable int/string to primitive wholestage off 55 63 12 1,8 547,9 1,0X +long/nullable int/string to primitive wholestage on 43 44 2 2,3 428,0 1,3X + + +================================================================================================ +UDF with primitive types +================================================================================================ + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz +long/nullable int to string: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +long/nullable int to string wholestage off 46 48 2 2,2 461,2 1,0X +long/nullable int to string wholestage on 49 56 8 2,0 488,9 0,9X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz +long/nullable int to option: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +long/nullable int to option wholestage off 41 47 9 2,4 408,2 1,0X +long/nullable int to option wholestage on 26 28 2 3,9 256,7 1,6X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz +long/nullable int to primitive: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +long/nullable int to primitive wholestage off 26 27 0 3,8 263,7 1,0X +long/nullable int to primitive wholestage on 26 31 5 3,8 262,2 1,0X + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.6 +Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz +UDF identity overhead: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Baseline 20 22 1 4,9 204,3 1,0X +With identity UDF 24 26 2 4,1 241,3 0,8X + + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UDFBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UDFBenchmark.scala new file mode 100644 index 000000000000..9cbd6423f667 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/UDFBenchmark.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{IntegerType, StringType} + +/** + * Synthetic benchmark for Scala User Defined Functions. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class --jars + * 2. build/sbt "sql/test:runMain " + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain " + * Results will be written to "benchmarks/UDFBenchmark-results.txt". + * }}} + */ +object UDFBenchmark extends SqlBasedBenchmark { + + private def doRunBenchmarkWithMixedTypes(udf: UserDefinedFunction, cardinality: Int): Unit = { + val idCol = col("id") + val nullableIntCol = when( + idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType)) + val stringCol = idCol.cast(StringType) + spark.range(cardinality).select( + udf(idCol, nullableIntCol, stringCol)).write.format("noop").save() + } + + private def doRunBenchmarkWithPrimitiveTypes( + udf: UserDefinedFunction, cardinality: Int): Unit = { + val idCol = col("id") + val nullableIntCol = when( + idCol % 2 === 0, idCol.cast(IntegerType)).otherwise(Literal(null, IntegerType)) + spark.range(cardinality).select(udf(idCol, nullableIntCol)).write.format("noop").save() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val cardinality = 100000 + runBenchmark("UDF with mixed input types") { + codegenBenchmark("long/nullable int/string to string", cardinality) { + val sampleUDF = udf {(a: Long, b: java.lang.Integer, c: String) => + s"$a,$b,$c" + } + doRunBenchmarkWithMixedTypes(sampleUDF, cardinality) + } + + codegenBenchmark("long/nullable int/string to option", cardinality) { + val sampleUDF = udf {(_: Long, b: java.lang.Integer, _: String) => + Option(b) + } + doRunBenchmarkWithMixedTypes(sampleUDF, cardinality) + } + + codegenBenchmark("long/nullable int/string to primitive", cardinality) { + val sampleUDF = udf {(a: Long, b: java.lang.Integer, _: String) => + Option(b).map(_.longValue()).getOrElse(a) + } + doRunBenchmarkWithMixedTypes(sampleUDF, cardinality) + } + } + + runBenchmark("UDF with primitive types") { + codegenBenchmark("long/nullable int to string", cardinality) { + val sampleUDF = udf {(a: Long, b: java.lang.Integer) => + s"$a,$b" + } + doRunBenchmarkWithPrimitiveTypes(sampleUDF, cardinality) + } + + codegenBenchmark("long/nullable int to option", cardinality) { + val sampleUDF = udf {(_: Long, b: java.lang.Integer) => + Option(b) + } + doRunBenchmarkWithPrimitiveTypes(sampleUDF, cardinality) + } + + codegenBenchmark("long/nullable int to primitive", cardinality) { + val sampleUDF = udf {(a: Long, b: java.lang.Integer) => + Option(b).map(_.longValue()).getOrElse(a) + } + doRunBenchmarkWithPrimitiveTypes(sampleUDF, cardinality) + } + + val benchmark = new Benchmark("UDF identity overhead", cardinality, output = output) + + benchmark.addCase(s"Baseline", numIters = 5) { _ => + spark.range(cardinality).select( + col("id"), col("id") * 2, col("id") * 3).write.format("noop").save() + } + + val identityUDF = udf { x: Long => x } + benchmark.addCase(s"With identity UDF", numIters = 5) { _ => + spark.range(cardinality).select( + identityUDF(col("id")), + identityUDF(col("id") * 2), + identityUDF(col("id") * 3)).write.format("noop").save() + } + + benchmark.run() + } + } +}