-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25710][SQL] range should report metrics correctly #22698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| ================================================================================================ | ||
| range | ||
| ================================================================================================ | ||
|
|
||
| Java HotSpot(TM) 64-Bit Server VM 1.8.0_161-b12 on Mac OS X 10.13.6 | ||
| Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz | ||
|
|
||
| range: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
| ------------------------------------------------------------------------------------------------ | ||
| full scan 12674 / 12840 41.4 24.2 1.0X | ||
| limit after range 33 / 37 15900.2 0.1 384.4X | ||
| filter after range 969 / 985 541.0 1.8 13.1X | ||
| count after range 42 / 42 12510.5 0.1 302.4X | ||
| count after limit after range 32 / 33 16337.0 0.1 394.9X | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. several learnings:
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -452,8 +452,15 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | |
|
|
||
| val localIdx = ctx.freshName("localIdx") | ||
| val localEnd = ctx.freshName("localEnd") | ||
| val shouldStop = if (parent.needStopCheck) { | ||
| s"if (shouldStop()) { $nextIndex = $value + ${step}L; return; }" | ||
| val stopCheck = if (parent.needStopCheck) { | ||
| s""" | ||
| |if (shouldStop()) { | ||
| | $nextIndex = $value + ${step}L; | ||
| | $numOutput.add($localIdx + 1); | ||
| | $inputMetrics.incRecordsRead($localIdx + 1); | ||
| | return; | ||
| |} | ||
| """.stripMargin | ||
| } else { | ||
| "// shouldStop check is eliminated" | ||
| } | ||
|
|
@@ -506,18 +513,18 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) | |
| | $numElementsTodo = 0; | ||
| | if ($nextBatchTodo == 0) break; | ||
| | } | ||
| | $numOutput.add($nextBatchTodo); | ||
| | $inputMetrics.incRecordsRead($nextBatchTodo); | ||
| | $batchEnd += $nextBatchTodo * ${step}L; | ||
| | } | ||
| | | ||
| | int $localEnd = (int)(($batchEnd - $nextIndex) / ${step}L); | ||
| | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { | ||
| | long $value = ((long)$localIdx * ${step}L) + $nextIndex; | ||
| | ${consume(ctx, Seq(ev))} | ||
| | $shouldStop | ||
| | $stopCheck | ||
| | } | ||
| | $nextIndex = $batchEnd; | ||
| | $numOutput.add($localEnd); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this means now the metrics are updated only once the whole processing of a batch happens right?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. partially yes, when there is no stop check. If there is stop check, we will directly
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also in that case we update the metrics after processing the rows, right? i am just wondering if we can think of updating the metrics as before but in the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Is it to keep the code diff small? Otherwise I think it's always better to only update metrics once, instead of add-then-remove.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I image a case. There is range + limit + a blocking op + ... Now as at the range there is no Assume a range batch is 1000. Because there is no I've not really tested it. Not sure if it is really a problem. Since it is late, I may check it more tomorrow if it has not figured out yet.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. that's expected isn't it? The range operator does output 1000 rows, the limit operator takes 1000 inputs, but only output like 100 rows.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. more background: the stop check for limit is done in batch granularity, while the stop check for result buffer is done in row granularity. That said, even if the limit is smaller than the batch size, the range operator still outputs a entire batch, physically.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it is, then it is no problem. I was thinking that the number of output metric at range operator should be 100 if it is followed by a limit(100) operator. |
||
| | $inputMetrics.incRecordsRead($localEnd); | ||
| | $taskContext.killTaskIfInterrupted(); | ||
| | } | ||
| """.stripMargin | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.benchmark | ||
|
|
||
| import org.apache.spark.benchmark.Benchmark | ||
|
|
||
| /** | ||
| * Benchmark to measure performance for range operator. | ||
| * To run this benchmark: | ||
| * {{{ | ||
| * 1. without sbt: | ||
| * bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar> | ||
| * 2. build/sbt "sql/test:runMain <this class>" | ||
| * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" | ||
| * Results will be written to "benchmarks/RangeBenchmark-results.txt". | ||
| * }}} | ||
| */ | ||
| object RangeBenchmark extends SqlBasedBenchmark { | ||
|
|
||
| override def runBenchmarkSuite(): Unit = { | ||
| import spark.implicits._ | ||
|
|
||
| runBenchmark("range") { | ||
| val N = 500L << 20 | ||
| val benchmark = new Benchmark("range", N, output = output) | ||
|
|
||
| benchmark.addCase("full scan", numIters = 4) { _ => | ||
| spark.range(N).queryExecution.toRdd.foreach(_ => ()) | ||
| } | ||
|
|
||
| benchmark.addCase("limit after range", numIters = 4) { _ => | ||
| spark.range(N).limit(100).queryExecution.toRdd.foreach(_ => ()) | ||
| } | ||
|
|
||
| benchmark.addCase("filter after range", numIters = 4) { _ => | ||
| spark.range(N).filter('id % 100 === 0).queryExecution.toRdd.foreach(_ => ()) | ||
| } | ||
|
|
||
| benchmark.addCase("count after range", numIters = 4) { _ => | ||
| spark.range(N).count() | ||
| } | ||
|
|
||
| benchmark.addCase("count after limit after range", numIters = 4) { _ => | ||
| spark.range(N).limit(100).count() | ||
| } | ||
|
|
||
| benchmark.run() | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've also tried the commit before this PR, the benchmark result is almost same.