-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16929] Improve performance when check speculatable tasks. #16867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
09719a2
318a172
5aa2fcf
1728895
2518a95
7740d77
c13a198
617d5aa
5192a32
b9bdf44
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,95 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.util.collection | ||
|
|
||
| import scala.collection.mutable.PriorityQueue | ||
|
|
||
| /** | ||
| * MedianHeap is designed to be used to quickly track the median of a group of numbers | ||
| * that may contain duplicates. Inserting a new number has O(log n) time complexity and | ||
| * determining the median has O(1) time complexity. | ||
| * The basic idea is to maintain two heaps: a smallerHalf and a largerHalf. The smallerHalf | ||
| * stores the smaller half of all numbers while the largerHalf stores the larger half. | ||
| * The sizes of two heaps need to be balanced each time when a new number is inserted so | ||
| * that their sizes will not be different by more than 1. Therefore each time when | ||
| * findMedian() is called we check if two heaps have the same size. If they do, we should | ||
| * return the average of the two top values of heaps. Otherwise we return the top of the | ||
| * heap which has one more element. | ||
| */ | ||
|
|
||
| private[spark] class MedianHeap(implicit val ord: Ordering[Double]) { | ||
|
|
||
| // Stores all the numbers less than the current median in a smallerHalf, | ||
| // i.e median is the maximum, at the root | ||
| private[this] var smallerHalf = PriorityQueue.empty[Double](ord) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. very minor -- could you make this comment a doc with |
||
|
|
||
| // Stores all the numbers greater than the current median in a largerHalf, | ||
| // i.e median is the minimum, at the root | ||
| private[this] var largerHalf = PriorityQueue.empty[Double](ord.reverse) | ||
|
|
||
| // Returns if there is no element in MedianHeap. | ||
| def isEmpty(): Boolean = { | ||
| smallerHalf.isEmpty && largerHalf.isEmpty | ||
| } | ||
|
|
||
| // Size of MedianHeap. | ||
| def size(): Int = { | ||
| smallerHalf.size + largerHalf.size | ||
| } | ||
|
|
||
| // Insert a new number into MedianHeap. | ||
| def insert(x: Double): Unit = { | ||
| // If both heaps are empty, we arbitrarily insert it into a heap, let's say, the largerHalf. | ||
| if (isEmpty) { | ||
| largerHalf.enqueue(x) | ||
| } else { | ||
| // If the number is larger than current median, it should be inserted into largerHalf, | ||
| // otherwise smallerHalf. | ||
| if (x > median) { | ||
| largerHalf.enqueue(x) | ||
| } else { | ||
| smallerHalf.enqueue(x) | ||
| } | ||
| } | ||
| rebalance() | ||
| } | ||
|
|
||
| // Re-balance the heaps. | ||
| private[this] def rebalance(): Unit = { | ||
| if (largerHalf.size - smallerHalf.size > 1) { | ||
| smallerHalf.enqueue(largerHalf.dequeue()) | ||
| } | ||
| if (smallerHalf.size - largerHalf.size > 1) { | ||
| largerHalf.enqueue(smallerHalf.dequeue) | ||
| } | ||
| } | ||
|
|
||
| // Returns the median of the numbers. | ||
| def median: Double = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: I find comments which basically just restate the method name to be pretty pointless. I'd only include them if they add something else, eg. preconditions, or complexity, etc. Mostly I'd say they're not necessary for any of the methods here. |
||
| if (isEmpty) { | ||
| throw new NoSuchElementException("MedianHeap is empty.") | ||
| } | ||
| if (largerHalf.size == smallerHalf.size) { | ||
| (largerHalf.head + smallerHalf.head) / 2.0 | ||
| } else if (largerHalf.size > smallerHalf.size) { | ||
| largerHalf.head | ||
| } else { | ||
| smallerHalf.head | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -893,6 +893,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg | |
| val taskSet = FakeTask.createTaskSet(4) | ||
| // Set the speculation multiplier to be 0 so speculative tasks are launched immediately | ||
| sc.conf.set("spark.speculation.multiplier", "0.0") | ||
| sc.conf.set("spark.speculation", "true") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can remove this once you make the change I suggested above to eliminate the (redundant) check
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be set. Because the duration is inserted to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ohhh cool that makes sense |
||
| val clock = new ManualClock() | ||
| val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) | ||
| val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => | ||
|
|
@@ -948,6 +949,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg | |
| // Set the speculation multiplier to be 0 so speculative tasks are launched immediately | ||
| sc.conf.set("spark.speculation.multiplier", "0.0") | ||
| sc.conf.set("spark.speculation.quantile", "0.6") | ||
| sc.conf.set("spark.speculation", "true") | ||
| val clock = new ManualClock() | ||
| val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) | ||
| val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.util.collection | ||
|
|
||
| import java.util.Arrays | ||
|
||
| import java.util.NoSuchElementException | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.util.Random | ||
|
|
||
| import org.apache.spark.SparkFunSuite | ||
|
|
||
| class MedianHeapSuite extends SparkFunSuite { | ||
|
|
||
| test("If no numbers in MedianHeap, NoSuchElementException is thrown.") { | ||
| val medianHeap = new MedianHeap() | ||
| var valid = false | ||
| try { | ||
| medianHeap.median | ||
| } catch { | ||
| case e: NoSuchElementException => | ||
| valid = true | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scalatest has a simpler pattern for this: intercept[NoSuchElementException] {
medianHeap.median
}http://www.scalatest.org/user_guide/using_assertions (I guess you could use
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks a lot for the recommendation :) |
||
|
|
||
| assert(valid) | ||
| } | ||
|
|
||
| test("Median should be correct when size of MedianHeap is even") { | ||
| val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9) | ||
| val medianHeap = new MedianHeap() | ||
| array.foreach(medianHeap.insert(_)) | ||
| assert(medianHeap.size() === 10) | ||
| assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) | ||
|
||
| } | ||
|
|
||
| test("Median should be correct when size of MedianHeap is odd") { | ||
| val array = Array(0, 1, 2, 3, 4, 5, 6, 7, 8) | ||
| val medianHeap = new MedianHeap() | ||
| array.foreach(medianHeap.insert(_)) | ||
| assert(medianHeap.size() === 9) | ||
| assert(medianHeap.median === (array(4))) | ||
|
||
| } | ||
|
|
||
| test("Size of Median should be correct though there are duplicated numbers inside.") { | ||
|
||
| val array = Array(0, 0, 1, 1, 2, 2, 3, 3, 4, 4) | ||
|
||
| val medianHeap = new MedianHeap() | ||
| array.foreach(medianHeap.insert(_)) | ||
| Arrays.sort(array) | ||
| assert(medianHeap.size === 10) | ||
| assert(medianHeap.median === ((array(4) + array(5)) / 2.0)) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know Kay asked for tests with a some hardcoded data, but I think these tests are too simplistic. All of these tests insert data in order, and none have significant skew. Can you add a test case which does something like:
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I added this change. |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: delete this blank line