Skip to content

Commit da857a3

Browse files
committed
Merge remote-tracking branch 'apache/master' into SPARK-11672.2
2 parents 62b09a3 + bc09296 commit da857a3

20 files changed

Lines changed: 150 additions & 164 deletions

File tree

R/pkg/inst/tests/test_sparkSQL.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ test_that("group by, agg functions", {
10071007
df3 <- agg(gd, age = "stddev")
10081008
expect_is(df3, "DataFrame")
10091009
df3_local <- collect(df3)
1010-
expect_equal(0, df3_local[df3_local$name == "Andy",][1, 2])
1010+
expect_true(is.nan(df3_local[df3_local$name == "Andy",][1, 2]))
10111011

10121012
df4 <- agg(gd, sumAge = sum(df$age))
10131013
expect_is(df4, "DataFrame")
@@ -1038,7 +1038,7 @@ test_that("group by, agg functions", {
10381038
df7 <- agg(gd2, value = "stddev")
10391039
df7_local <- collect(df7)
10401040
expect_true(abs(df7_local[df7_local$name == "ID1",][1, 2] - 6.928203) < 1e-6)
1041-
expect_equal(0, df7_local[df7_local$name == "ID2",][1, 2])
1041+
expect_true(is.nan(df7_local[df7_local$name == "ID2",][1, 2]))
10421042

10431043
mockLines3 <- c("{\"name\":\"Andy\", \"age\":30}",
10441044
"{\"name\":\"Andy\", \"age\":30}",

core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.net.{InetAddress, Socket}
2121

2222
import org.apache.spark.SPARK_VERSION
2323
import org.apache.spark.launcher.LauncherProtocol._
24-
import org.apache.spark.util.ThreadUtils
24+
import org.apache.spark.util.{ThreadUtils, Utils}
2525

2626
/**
2727
* A class that can be used to talk to a launcher server. Users should extend this class to
@@ -88,12 +88,20 @@ private[spark] abstract class LauncherBackend {
8888
*/
8989
protected def onDisconnected() : Unit = { }
9090

91+
private def fireStopRequest(): Unit = {
92+
val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
93+
override def run(): Unit = Utils.tryLogNonFatalError {
94+
onStopRequest()
95+
}
96+
})
97+
thread.start()
98+
}
9199

92100
private class BackendConnection(s: Socket) extends LauncherConnection(s) {
93101

94102
override protected def handle(m: Message): Unit = m match {
95103
case _: Stop =>
96-
onStopRequest()
104+
fireStopRequest()
97105

98106
case _ =>
99107
throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}")

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -191,17 +191,19 @@ private[spark] class SparkDeploySchedulerBackend(
191191
}
192192

193193
private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
194-
stopping = true
194+
try {
195+
stopping = true
195196

196-
launcherBackend.setState(finalState)
197-
launcherBackend.close()
197+
super.stop()
198+
client.stop()
198199

199-
super.stop()
200-
client.stop()
201-
202-
val callback = shutdownCallback
203-
if (callback != null) {
204-
callback(this)
200+
val callback = shutdownCallback
201+
if (callback != null) {
202+
callback(this)
203+
}
204+
} finally {
205+
launcherBackend.setState(finalState)
206+
launcherBackend.close()
205207
}
206208
}
207209

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringEscapeUtils
2828

2929
import org.apache.spark.{InternalAccumulator, SparkConf}
3030
import org.apache.spark.executor.TaskMetrics
31-
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
31+
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo, TaskLocality}
3232
import org.apache.spark.ui._
3333
import org.apache.spark.ui.jobs.UIData._
3434
import org.apache.spark.util.{Utils, Distribution}
@@ -70,6 +70,21 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
7070

7171
private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true)
7272

73+
private def getLocalitySummaryString(stageData: StageUIData): String = {
74+
val localities = stageData.taskData.values.map(_.taskInfo.taskLocality)
75+
val localityCounts = localities.groupBy(identity).mapValues(_.size)
76+
val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) =>
77+
val localityName = locality match {
78+
case TaskLocality.PROCESS_LOCAL => "Process local"
79+
case TaskLocality.NODE_LOCAL => "Node local"
80+
case TaskLocality.RACK_LOCAL => "Rack local"
81+
case TaskLocality.ANY => "Any"
82+
}
83+
s"$localityName: $count"
84+
}
85+
localityNamesAndCounts.sorted.mkString("; ")
86+
}
87+
7388
def render(request: HttpServletRequest): Seq[Node] = {
7489
progressListener.synchronized {
7590
val parameterId = request.getParameter("id")
@@ -129,6 +144,10 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
129144
<strong>Total Time Across All Tasks: </strong>
130145
{UIUtils.formatDuration(stageData.executorRunTime)}
131146
</li>
147+
<li>
148+
<strong>Locality Level Summary: </strong>
149+
{getLocalitySummaryString(stageData)}
150+
</li>
132151
{if (stageData.hasInput) {
133152
<li>
134153
<strong>Input Size / Records: </strong>

docs/job-scheduling.md

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -56,36 +56,32 @@ provide another approach to share RDDs.
5656

5757
## Dynamic Resource Allocation
5858

59-
Spark 1.2 introduces the ability to dynamically scale the set of cluster resources allocated to
60-
your application up and down based on the workload. This means that your application may give
61-
resources back to the cluster if they are no longer used and request them again later when there
62-
is demand. This feature is particularly useful if multiple applications share resources in your
63-
Spark cluster. If a subset of the resources allocated to an application becomes idle, it can be
64-
returned to the cluster's pool of resources and acquired by other applications. In Spark, dynamic
65-
resource allocation is performed on the granularity of the executor and can be enabled through
66-
`spark.dynamicAllocation.enabled`.
67-
68-
This feature is currently disabled by default and available only on [YARN](running-on-yarn.html).
69-
A future release will extend this to [standalone mode](spark-standalone.html) and
70-
[Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes). Note that although Spark on
71-
Mesos already has a similar notion of dynamic resource sharing in fine-grained mode, enabling
72-
dynamic allocation allows your Mesos application to take advantage of coarse-grained low-latency
73-
scheduling while sharing cluster resources efficiently.
59+
Spark provides a mechanism to dynamically adjust the resources your application occupies based
60+
on the workload. This means that your application may give resources back to the cluster if they
61+
are no longer used and request them again later when there is demand. This feature is particularly
62+
useful if multiple applications share resources in your Spark cluster.
63+
64+
This feature is disabled by default and available on all coarse-grained cluster managers, i.e.
65+
[standalone mode](spark-standalone.html), [YARN mode](running-on-yarn.html), and
66+
[Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes).
7467

7568
### Configuration and Setup
7669

77-
All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace.
78-
To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true`.
79-
Other relevant configurations are described on the
80-
[configurations page](configuration.html#dynamic-allocation) and in the subsequent sections in
81-
detail.
70+
There are two requirements for using this feature. First, your application must set
71+
`spark.dynamicAllocation.enabled` to `true`. Second, you must set up an *external shuffle service*
72+
on each worker node in the same cluster and set `spark.shuffle.service.enabled` to true in your
73+
application. The purpose of the external shuffle service is to allow executors to be removed
74+
without deleting shuffle files written by them (more detail described
75+
[below](job-scheduling.html#graceful-decommission-of-executors)). The way to set up this service
76+
varies across cluster managers:
77+
78+
In standalone mode, simply start your workers with `spark.shuffle.service.enabled` set to `true`.
8279

83-
Additionally, your application must use an external shuffle service. The purpose of the service is
84-
to preserve the shuffle files written by executors so the executors can be safely removed (more
85-
detail described [below](job-scheduling.html#graceful-decommission-of-executors)). To enable
86-
this service, set `spark.shuffle.service.enabled` to `true`. In YARN, this external shuffle service
87-
is implemented in `org.apache.spark.yarn.network.YarnShuffleService` that runs in each `NodeManager`
88-
in your cluster. To start this service, follow these steps:
80+
In Mesos coarse-grained mode, run `$SPARK_HOME/sbin/start-mesos-shuffle-service.sh` on all
81+
slave nodes with `spark.shuffle.service.enabled` set to `true`. For instance, you may do so
82+
through Marathon.
83+
84+
In YARN mode, start the shuffle service on each `NodeManager` as follows:
8985

9086
1. Build Spark with the [YARN profile](building-spark.html). Skip this step if you are using a
9187
pre-packaged distribution.
@@ -95,10 +91,13 @@ pre-packaged distribution.
9591
2. Add this jar to the classpath of all `NodeManager`s in your cluster.
9692
3. In the `yarn-site.xml` on each node, add `spark_shuffle` to `yarn.nodemanager.aux-services`,
9793
then set `yarn.nodemanager.aux-services.spark_shuffle.class` to
98-
`org.apache.spark.network.yarn.YarnShuffleService`. Additionally, set all relevant
99-
`spark.shuffle.service.*` [configurations](configuration.html).
94+
`org.apache.spark.network.yarn.YarnShuffleService` and `spark.shuffle.service.enabled` to true.
10095
4. Restart all `NodeManager`s in your cluster.
10196

97+
All other relevant configurations are optional and under the `spark.dynamicAllocation.*` and
98+
`spark.shuffle.service.*` namespaces. For more detail, see the
99+
[configurations page](configuration.html#dynamic-allocation).
100+
102101
### Resource Allocation Policy
103102

104103
At a high level, Spark should relinquish executors when they are no longer used and acquire

docs/tuning.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ The [Kryo documentation](https://github.com/EsotericSoftware/kryo) describes mor
6161
registration options, such as adding custom serialization code.
6262

6363
If your objects are large, you may also need to increase the `spark.kryoserializer.buffer`
64-
config property. The default is 2, but this value needs to be large enough to hold the *largest*
65-
object you will serialize.
64+
[config](configuration.html#compression-and-serialization). This value needs to be large enough
65+
to hold the *largest* object you will serialize.
6666

6767
Finally, if you don't register your custom classes, Kryo will still work, but it will have to store
6868
the full class name with each object, which is wasteful.

launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.launcher;
1919

2020
import java.io.IOException;
21+
import java.lang.reflect.Method;
2122
import java.util.ArrayList;
2223
import java.util.List;
2324
import java.util.concurrent.ThreadFactory;
@@ -102,8 +103,20 @@ public synchronized void kill() {
102103
disconnect();
103104
}
104105
if (childProc != null) {
105-
childProc.destroy();
106-
childProc = null;
106+
try {
107+
childProc.exitValue();
108+
} catch (IllegalThreadStateException e) {
109+
// Child is still alive. Try to use Java 8's "destroyForcibly()" if available,
110+
// fall back to the old API if it's not there.
111+
try {
112+
Method destroy = childProc.getClass().getMethod("destroyForcibly");
113+
destroy.invoke(childProc);
114+
} catch (Exception inner) {
115+
childProc.destroy();
116+
}
117+
} finally {
118+
childProc = null;
119+
}
107120
}
108121
}
109122

launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ public boolean isFinal() {
8989
* Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send
9090
* a {@link #stop()} message to the application, so it's recommended that users first try to
9191
* stop the application cleanly and only resort to this method if that fails.
92+
* <p>
93+
* Note that if the application is running as a child process, this method fail to kill the
94+
* process when using Java 7. This may happen if, for example, the application is deadlocked.
9295
*/
9396
void kill();
9497

python/pyspark/rdd.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1760,7 +1760,6 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
17601760
In addition, users can control the partitioning of the output RDD.
17611761
17621762
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
1763-
>>> def f(x): return x
17641763
>>> def add(a, b): return a + str(b)
17651764
>>> sorted(x.combineByKey(str, add, add).collect())
17661765
[('a', '11'), ('b', '1')]

python/pyspark/sql/context.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None):
415415
416416
>>> sqlContext.createDataFrame(df.toPandas()).collect() # doctest: +SKIP
417417
[Row(name=u'Alice', age=1)]
418-
>>> sqlContext.createDataFrame(pandas.DataFrame([[1, 2]]).collect()) # doctest: +SKIP
418+
>>> sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect() # doctest: +SKIP
419419
[Row(0=1, 1=2)]
420420
"""
421421
if isinstance(data, DataFrame):

0 commit comments

Comments
 (0)