Skip to content

Commit 43dba95

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into viz_subquery
2 parents cff0871 + c37bbb3 commit 43dba95

File tree

19 files changed

+1418
-59
lines changed

19 files changed

+1418
-59
lines changed

bin/spark-submit.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ rem
2020
rem This is the entry point for running Spark submit. To avoid polluting the
2121
rem environment, it just launches a new cmd to do the real work.
2222

23-
cmd /V /E /C spark-submit2.cmd %*
23+
cmd /V /E /C "%~dp0spark-submit2.cmd" %*

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
255255
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
256256
}
257257
}
258+
259+
if (proxyUser != null && principal != null) {
260+
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
261+
}
258262
}
259263

260264
private def validateKillArguments(): Unit = {
@@ -517,6 +521,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
517521
| --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
518522
|
519523
| --proxy-user NAME User to impersonate when submitting the application.
524+
| This argument does not work with --principal / --keytab.
520525
|
521526
| --help, -h Show this help message and exit
522527
| --verbose, -v Print additional debug output

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,19 @@ private[spark] class Executor(
114114
private val heartbeatReceiverRef =
115115
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
116116

117+
/**
118+
* When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
119+
* times, it should kill itself. The default value is 60. It means we will retry to send
120+
* heartbeats about 10 minutes because the heartbeat interval is 10s.
121+
*/
122+
private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)
123+
124+
/**
125+
* Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each
126+
* successful heartbeat will reset it to 0.
127+
*/
128+
private var heartbeatFailures = 0
129+
117130
startDriverHeartbeater()
118131

119132
def launchTask(
@@ -461,8 +474,16 @@ private[spark] class Executor(
461474
logInfo("Told to re-register on heartbeat")
462475
env.blockManager.reregister()
463476
}
477+
heartbeatFailures = 0
464478
} catch {
465-
case NonFatal(e) => logWarning("Issue communicating with driver in heartbeater", e)
479+
case NonFatal(e) =>
480+
logWarning("Issue communicating with driver in heartbeater", e)
481+
heartbeatFailures += 1
482+
if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
483+
logError(s"Exit as unable to send heartbeats to driver " +
484+
s"more than $HEARTBEAT_MAX_FAILURES times")
485+
System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
486+
}
466487
}
467488
}
468489

core/src/main/scala/org/apache/spark/executor/ExecutorExitCode.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ object ExecutorExitCode {
3939
/** ExternalBlockStore failed to create a local temporary directory after many attempts. */
4040
val EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR = 55
4141

42+
/**
43+
* Executor is unable to send heartbeats to the driver more than
44+
* "spark.executor.heartbeat.maxFailures" times.
45+
*/
46+
val HEARTBEAT_FAILURE = 56
47+
4248
def explainExitCode(exitCode: Int): String = {
4349
exitCode match {
4450
case UNCAUGHT_EXCEPTION => "Uncaught exception"
@@ -51,6 +57,8 @@ object ExecutorExitCode {
5157
// TODO: replace external block store with concrete implementation name
5258
case EXTERNAL_BLOCK_STORE_FAILED_TO_CREATE_DIR =>
5359
"ExternalBlockStore failed to create a local temporary directory."
60+
case HEARTBEAT_FAILURE =>
61+
"Unable to send heartbeats to driver."
5462
case _ =>
5563
"Unknown executor exit code (" + exitCode + ")" + (
5664
if (exitCode > 128) {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.examples.ml;
19+
20+
import java.util.Arrays;
21+
22+
import org.apache.spark.SparkConf;
23+
import org.apache.spark.api.java.JavaRDD;
24+
import org.apache.spark.api.java.JavaSparkContext;
25+
import org.apache.spark.sql.RowFactory;
26+
import org.apache.spark.sql.SQLContext;
27+
// $example on$
28+
import org.apache.spark.ml.clustering.BisectingKMeans;
29+
import org.apache.spark.ml.clustering.BisectingKMeansModel;
30+
import org.apache.spark.mllib.linalg.Vector;
31+
import org.apache.spark.mllib.linalg.VectorUDT;
32+
import org.apache.spark.mllib.linalg.Vectors;
33+
import org.apache.spark.sql.DataFrame;
34+
import org.apache.spark.sql.Row;
35+
import org.apache.spark.sql.types.Metadata;
36+
import org.apache.spark.sql.types.StructField;
37+
import org.apache.spark.sql.types.StructType;
38+
// $example off$
39+
40+
41+
/**
42+
* An example demonstrating a bisecting k-means clustering.
43+
*/
44+
public class JavaBisectingKMeansExample {
45+
46+
public static void main(String[] args) {
47+
SparkConf conf = new SparkConf().setAppName("JavaBisectingKMeansExample");
48+
JavaSparkContext jsc = new JavaSparkContext(conf);
49+
SQLContext jsql = new SQLContext(jsc);
50+
51+
// $example on$
52+
JavaRDD<Row> data = jsc.parallelize(Arrays.asList(
53+
RowFactory.create(Vectors.dense(0.1, 0.1, 0.1)),
54+
RowFactory.create(Vectors.dense(0.3, 0.3, 0.25)),
55+
RowFactory.create(Vectors.dense(0.1, 0.1, -0.1)),
56+
RowFactory.create(Vectors.dense(20.3, 20.1, 19.9)),
57+
RowFactory.create(Vectors.dense(20.2, 20.1, 19.7)),
58+
RowFactory.create(Vectors.dense(18.9, 20.0, 19.7))
59+
));
60+
61+
StructType schema = new StructType(new StructField[]{
62+
new StructField("features", new VectorUDT(), false, Metadata.empty()),
63+
});
64+
65+
DataFrame dataset = jsql.createDataFrame(data, schema);
66+
67+
BisectingKMeans bkm = new BisectingKMeans().setK(2);
68+
BisectingKMeansModel model = bkm.fit(dataset);
69+
70+
System.out.println("Compute Cost: " + model.computeCost(dataset));
71+
72+
Vector[] clusterCenters = model.clusterCenters();
73+
for (int i = 0; i < clusterCenters.length; i++) {
74+
Vector clusterCenter = clusterCenters[i];
75+
System.out.println("Cluster Center " + i + ": " + clusterCenter);
76+
}
77+
// $example off$
78+
79+
jsc.stop();
80+
}
81+
}

examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
// $example off$
3434

3535
/**
36-
* Java example for graph clustering using power iteration clustering (PIC).
36+
* Java example for bisecting k-means clustering.
3737
*/
3838
public class JavaBisectingKMeansExample {
3939
public static void main(String[] args) {
@@ -54,9 +54,7 @@ public static void main(String[] args) {
5454
BisectingKMeansModel model = bkm.run(data);
5555

5656
System.out.println("Compute Cost: " + model.computeCost(data));
57-
for (Vector center: model.clusterCenters()) {
58-
System.out.println("");
59-
}
57+
6058
Vector[] clusterCenters = model.clusterCenters();
6159
for (int i = 0; i < clusterCenters.length; i++) {
6260
Vector clusterCenter = clusterCenters[i];

mllib/src/main/scala/org/apache/spark/ml/optim/WeightedLeastSquares.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@ private[ml] class WeightedLeastSquares(
156156

157157
private[ml] object WeightedLeastSquares {
158158

159+
/**
160+
* In order to take the normal equation approach efficiently, [[WeightedLeastSquares]]
161+
* only supports the number of features is no more than 4096.
162+
*/
163+
val MAX_NUM_FEATURES: Int = 4096
164+
159165
/**
160166
* Aggregator to provide necessary summary statistics for solving [[WeightedLeastSquares]].
161167
*/
@@ -174,8 +180,8 @@ private[ml] object WeightedLeastSquares {
174180
private var aaSum: DenseVector = _
175181

176182
private def init(k: Int): Unit = {
177-
require(k <= 4096, "In order to take the normal equation approach efficiently, " +
178-
s"we set the max number of features to 4096 but got $k.")
183+
require(k <= MAX_NUM_FEATURES, "In order to take the normal equation approach efficiently, " +
184+
s"we set the max number of features to $MAX_NUM_FEATURES but got $k.")
179185
this.k = k
180186
triK = k * (k + 1) / 2
181187
count = 0L

0 commit comments

Comments
 (0)