Skip to content

Commit aca547f

Browse files
committed
Merge remote-tracking branch 'apache-github/master' into HEAD
2 parents c20f4fe + 70c5549 commit aca547f

32 files changed

Lines changed: 437 additions & 297 deletions

File tree

R/pkg/R/mllib.R

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,6 @@ setMethod("predict", signature(object = "KMeansModel"),
712712
#' of L1 and L2. Default is 0.0 which is an L2 penalty.
713713
#' @param maxIter maximum iteration number.
714714
#' @param tol convergence tolerance of iterations.
715-
#' @param fitIntercept whether to fit an intercept term.
716715
#' @param family the name of family which is a description of the label distribution to be used in the model.
717716
#' Supported options:
718717
#' \itemize{
@@ -747,11 +746,11 @@ setMethod("predict", signature(object = "KMeansModel"),
747746
#' \dontrun{
748747
#' sparkR.session()
749748
#' # binary logistic regression
750-
#' label <- c(1.0, 1.0, 1.0, 0.0, 0.0)
751-
#' feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
752-
#' binary_data <- as.data.frame(cbind(label, feature))
749+
#' label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
750+
#' features <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
751+
#' binary_data <- as.data.frame(cbind(label, features))
753752
#' binary_df <- createDataFrame(binary_data)
754-
#' blr_model <- spark.logit(binary_df, label ~ feature, thresholds = 1.0)
753+
#' blr_model <- spark.logit(binary_df, label ~ features, thresholds = 1.0)
755754
#' blr_predict <- collect(select(predict(blr_model, binary_df), "prediction"))
756755
#'
757756
#' # summary of binary logistic regression
@@ -783,7 +782,7 @@ setMethod("predict", signature(object = "KMeansModel"),
783782
#' @note spark.logit since 2.1.0
784783
setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula"),
785784
function(data, formula, regParam = 0.0, elasticNetParam = 0.0, maxIter = 100,
786-
tol = 1E-6, fitIntercept = TRUE, family = "auto", standardization = TRUE,
785+
tol = 1E-6, family = "auto", standardization = TRUE,
787786
thresholds = 0.5, weightCol = NULL, aggregationDepth = 2,
788787
probabilityCol = "probability") {
789788
formula <- paste(deparse(formula), collapse = "")
@@ -795,10 +794,10 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
795794
jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit",
796795
data@sdf, formula, as.numeric(regParam),
797796
as.numeric(elasticNetParam), as.integer(maxIter),
798-
as.numeric(tol), as.logical(fitIntercept),
799-
as.character(family), as.logical(standardization),
800-
as.array(thresholds), as.character(weightCol),
801-
as.integer(aggregationDepth), as.character(probabilityCol))
797+
as.numeric(tol), as.character(family),
798+
as.logical(standardization), as.array(thresholds),
799+
as.character(weightCol), as.integer(aggregationDepth),
800+
as.character(probabilityCol))
802801
new("LogisticRegressionModel", jobj = jobj)
803802
})
804803

R/pkg/inst/tests/testthat/test_mllib.R

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -646,30 +646,30 @@ test_that("spark.isotonicRegression", {
646646

647647
test_that("spark.logit", {
648648
# test binary logistic regression
649-
label <- c(1.0, 1.0, 1.0, 0.0, 0.0)
649+
label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
650650
feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
651651
binary_data <- as.data.frame(cbind(label, feature))
652652
binary_df <- createDataFrame(binary_data)
653653

654654
blr_model <- spark.logit(binary_df, label ~ feature, thresholds = 1.0)
655655
blr_predict <- collect(select(predict(blr_model, binary_df), "prediction"))
656-
expect_equal(blr_predict$prediction, c(0, 0, 0, 0, 0))
656+
expect_equal(blr_predict$prediction, c("0.0", "0.0", "0.0", "0.0", "0.0"))
657657
blr_model1 <- spark.logit(binary_df, label ~ feature, thresholds = 0.0)
658658
blr_predict1 <- collect(select(predict(blr_model1, binary_df), "prediction"))
659-
expect_equal(blr_predict1$prediction, c(1, 1, 1, 1, 1))
659+
expect_equal(blr_predict1$prediction, c("1.0", "1.0", "1.0", "1.0", "1.0"))
660660

661661
# test summary of binary logistic regression
662662
blr_summary <- summary(blr_model)
663663
blr_fmeasure <- collect(select(blr_summary$fMeasureByThreshold, "threshold", "F-Measure"))
664-
expect_equal(blr_fmeasure$threshold, c(0.8221347, 0.7884005, 0.6674709, 0.3785437, 0.3434487),
664+
expect_equal(blr_fmeasure$threshold, c(0.6565513, 0.6214563, 0.3325291, 0.2115995, 0.1778653),
665665
tolerance = 1e-4)
666-
expect_equal(blr_fmeasure$"F-Measure", c(0.5000000, 0.8000000, 0.6666667, 0.8571429, 0.7500000),
666+
expect_equal(blr_fmeasure$"F-Measure", c(0.6666667, 0.5000000, 0.8000000, 0.6666667, 0.5714286),
667667
tolerance = 1e-4)
668668
blr_precision <- collect(select(blr_summary$precisionByThreshold, "threshold", "precision"))
669-
expect_equal(blr_precision$precision, c(1.0000000, 1.0000000, 0.6666667, 0.7500000, 0.6000000),
669+
expect_equal(blr_precision$precision, c(1.0000000, 0.5000000, 0.6666667, 0.5000000, 0.4000000),
670670
tolerance = 1e-4)
671671
blr_recall <- collect(select(blr_summary$recallByThreshold, "threshold", "recall"))
672-
expect_equal(blr_recall$recall, c(0.3333333, 0.6666667, 0.6666667, 1.0000000, 1.0000000),
672+
expect_equal(blr_recall$recall, c(0.5000000, 0.5000000, 1.0000000, 1.0000000, 1.0000000),
673673
tolerance = 1e-4)
674674

675675
# test model save and read
@@ -683,6 +683,16 @@ test_that("spark.logit", {
683683
expect_error(summary(blr_model2))
684684
unlink(modelPath)
685685

686+
# test prediction label as text
687+
training <- suppressWarnings(createDataFrame(iris))
688+
binomial_training <- training[training$Species %in% c("versicolor", "virginica"), ]
689+
binomial_model <- spark.logit(binomial_training, Species ~ Sepal_Length + Sepal_Width)
690+
prediction <- predict(binomial_model, binomial_training)
691+
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
692+
expected <- c("virginica", "virginica", "virginica", "versicolor", "virginica",
693+
"versicolor", "virginica", "versicolor", "virginica", "versicolor")
694+
expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected)
695+
686696
# test multinomial logistic regression
687697
label <- c(0.0, 1.0, 2.0, 0.0, 0.0)
688698
feature1 <- c(4.845940, 5.64480, 7.430381, 6.464263, 5.555667)
@@ -694,7 +704,7 @@ test_that("spark.logit", {
694704

695705
model <- spark.logit(df, label ~., family = "multinomial", thresholds = c(0, 1, 1))
696706
predict1 <- collect(select(predict(model, df), "prediction"))
697-
expect_equal(predict1$prediction, c(0, 0, 0, 0, 0))
707+
expect_equal(predict1$prediction, c("0.0", "0.0", "0.0", "0.0", "0.0"))
698708
# Summary of multinomial logistic regression is not implemented yet
699709
expect_error(summary(model))
700710
})

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark
1919

2020
import java.io._
2121
import java.lang.reflect.Constructor
22-
import java.net.{MalformedURLException, URI}
22+
import java.net.{URI}
2323
import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID}
2424
import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
2525
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}

core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import java.io.{InputStream, IOException}
2222
import scala.io.Source
2323

2424
import com.fasterxml.jackson.core.JsonParseException
25+
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
2526
import org.json4s.jackson.JsonMethods._
2627

2728
import org.apache.spark.internal.Logging
@@ -87,6 +88,12 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
8788
// Ignore events generated by Structured Streaming in Spark 2.0.0 and 2.0.1.
8889
// It's safe since no place uses them.
8990
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
91+
case e: UnrecognizedPropertyException if e.getMessage != null && e.getMessage.startsWith(
92+
"Unrecognized field \"queryStatus\" " +
93+
"(class org.apache.spark.sql.streaming.StreamingQueryListener$") =>
94+
// Ignore events generated by Structured Streaming in Spark 2.0.2
95+
// It's safe since no place uses them.
96+
logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
9097
case jpe: JsonParseException =>
9198
// We can only ignore exception from last line of the file that might be truncated
9299
// the last entry may not be the very last line in the event log, but we treat it

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
7070
// if we find that it's okay.
7171
private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
7272

73-
private val displayPeakExecutionMemory = parent.conf.getBoolean("spark.sql.unsafe.enabled", true)
74-
7573
private def getLocalitySummaryString(stageData: StageUIData): String = {
7674
val localities = stageData.taskData.values.map(_.taskInfo.taskLocality)
7775
val localityCounts = localities.groupBy(identity).mapValues(_.size)
@@ -252,15 +250,13 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
252250
<span class="additional-metric-title">Getting Result Time</span>
253251
</span>
254252
</li>
255-
{if (displayPeakExecutionMemory) {
256-
<li>
257-
<span data-toggle="tooltip"
258-
title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
259-
<input type="checkbox" name={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}/>
260-
<span class="additional-metric-title">Peak Execution Memory</span>
261-
</span>
262-
</li>
263-
}}
253+
<li>
254+
<span data-toggle="tooltip"
255+
title={ToolTips.PEAK_EXECUTION_MEMORY} data-placement="right">
256+
<input type="checkbox" name={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}/>
257+
<span class="additional-metric-title">Peak Execution Memory</span>
258+
</span>
259+
</li>
264260
</ul>
265261
</div>
266262
</div>
@@ -532,13 +528,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
532528
{serializationQuantiles}
533529
</tr>,
534530
<tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>,
535-
if (displayPeakExecutionMemory) {
536-
<tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
537-
{peakExecutionMemoryQuantiles}
538-
</tr>
539-
} else {
540-
Nil
541-
},
531+
<tr class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
532+
{peakExecutionMemoryQuantiles}
533+
</tr>,
542534
if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil,
543535
if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil,
544536
if (stageData.hasShuffleRead) {
@@ -1166,9 +1158,6 @@ private[ui] class TaskPagedTable(
11661158
desc: Boolean,
11671159
executorsListener: ExecutorsListener) extends PagedTable[TaskTableRowData] {
11681160

1169-
// We only track peak memory used for unsafe operators
1170-
private val displayPeakExecutionMemory = conf.getBoolean("spark.sql.unsafe.enabled", true)
1171-
11721161
override def tableId: String = "task-table"
11731162

11741163
override def tableCssClass: String =
@@ -1217,14 +1206,8 @@ private[ui] class TaskPagedTable(
12171206
("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
12181207
("GC Time", ""),
12191208
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
1220-
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
1221-
{
1222-
if (displayPeakExecutionMemory) {
1223-
Seq(("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY))
1224-
} else {
1225-
Nil
1226-
}
1227-
} ++
1209+
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
1210+
("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
12281211
{if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
12291212
{if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++
12301213
{if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
@@ -1316,11 +1299,9 @@ private[ui] class TaskPagedTable(
13161299
<td class={TaskDetailsClassNames.GETTING_RESULT_TIME}>
13171300
{UIUtils.formatDuration(task.gettingResultTime)}
13181301
</td>
1319-
{if (displayPeakExecutionMemory) {
1320-
<td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
1321-
{Utils.bytesToString(task.peakExecutionMemoryUsed)}
1322-
</td>
1323-
}}
1302+
<td class={TaskDetailsClassNames.PEAK_EXECUTION_MEMORY}>
1303+
{Utils.bytesToString(task.peakExecutionMemoryUsed)}
1304+
</td>
13241305
{if (task.accumulators.nonEmpty) {
13251306
<td>{Unparsed(task.accumulators.get)}</td>
13261307
}}

core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,15 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
3535

3636
private val peakExecutionMemory = 10
3737

38-
test("peak execution memory only displayed if unsafe is enabled") {
39-
val unsafeConf = "spark.sql.unsafe.enabled"
40-
val conf = new SparkConf(false).set(unsafeConf, "true")
38+
test("peak execution memory should displayed") {
39+
val conf = new SparkConf(false)
4140
val html = renderStagePage(conf).toString().toLowerCase
4241
val targetString = "peak execution memory"
4342
assert(html.contains(targetString))
44-
// Disable unsafe and make sure it's not there
45-
val conf2 = new SparkConf(false).set(unsafeConf, "false")
46-
val html2 = renderStagePage(conf2).toString().toLowerCase
47-
assert(!html2.contains(targetString))
48-
// Avoid setting anything; it should be displayed by default
49-
val conf3 = new SparkConf(false)
50-
val html3 = renderStagePage(conf3).toString().toLowerCase
51-
assert(html3.contains(targetString))
5243
}
5344

5445
test("SPARK-10543: peak execution memory should be per-task rather than cumulative") {
55-
val unsafeConf = "spark.sql.unsafe.enabled"
56-
val conf = new SparkConf(false).set(unsafeConf, "true")
46+
val conf = new SparkConf(false)
5747
val html = renderStagePage(conf).toString().toLowerCase
5848
// verify min/25/50/75/max show task value not cumulative values
5949
assert(html.contains(s"<td>$peakExecutionMemory.0 b</td>" * 5))

dev/create-release/release-build.sh

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ if [[ "$1" == "package" ]]; then
150150
NAME=$1
151151
FLAGS=$2
152152
ZINC_PORT=$3
153+
BUILD_PIP_PACKAGE=$4
153154
cp -r spark spark-$SPARK_VERSION-bin-$NAME
154155

155156
cd spark-$SPARK_VERSION-bin-$NAME
@@ -170,24 +171,32 @@ if [[ "$1" == "package" ]]; then
170171
# Get maven home set by MVN
171172
MVN_HOME=`$MVN -version 2>&1 | grep 'Maven home' | awk '{print $NF}'`
172173

173-
echo "Creating distribution"
174-
./dev/make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz --pip $FLAGS \
175-
-DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log
176-
cd ..
177174

178-
echo "Copying and signing python distribution"
179-
PYTHON_DIST_NAME=pyspark-$PYSPARK_VERSION.tar.gz
180-
cp spark-$SPARK_VERSION-bin-$NAME/python/dist/$PYTHON_DIST_NAME .
181-
182-
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --armour \
183-
--output $PYTHON_DIST_NAME.asc \
184-
--detach-sig $PYTHON_DIST_NAME
185-
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
186-
MD5 $PYTHON_DIST_NAME > \
187-
$PYTHON_DIST_NAME.md5
188-
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
189-
SHA512 $PYTHON_DIST_NAME > \
190-
$PYTHON_DIST_NAME.sha
175+
if [ -z "$BUILD_PIP_PACKAGE" ]; then
176+
echo "Creating distribution without PIP package"
177+
./dev/make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz $FLAGS \
178+
-DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log
179+
cd ..
180+
else
181+
echo "Creating distribution with PIP package"
182+
./dev/make-distribution.sh --name $NAME --mvn $MVN_HOME/bin/mvn --tgz --pip $FLAGS \
183+
-DzincPort=$ZINC_PORT 2>&1 > ../binary-release-$NAME.log
184+
cd ..
185+
186+
echo "Copying and signing python distribution"
187+
PYTHON_DIST_NAME=pyspark-$PYSPARK_VERSION.tar.gz
188+
cp spark-$SPARK_VERSION-bin-$NAME/python/dist/$PYTHON_DIST_NAME .
189+
190+
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --armour \
191+
--output $PYTHON_DIST_NAME.asc \
192+
--detach-sig $PYTHON_DIST_NAME
193+
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
194+
MD5 $PYTHON_DIST_NAME > \
195+
$PYTHON_DIST_NAME.md5
196+
echo $GPG_PASSPHRASE | $GPG --passphrase-fd 0 --print-md \
197+
SHA512 $PYTHON_DIST_NAME > \
198+
$PYTHON_DIST_NAME.sha
199+
fi
191200

192201
echo "Copying and signing regular binary distribution"
193202
cp spark-$SPARK_VERSION-bin-$NAME/spark-$SPARK_VERSION-bin-$NAME.tgz .
@@ -211,7 +220,7 @@ if [[ "$1" == "package" ]]; then
211220
make_binary_release "hadoop2.3" "-Phadoop-2.3 $FLAGS" "3033" &
212221
make_binary_release "hadoop2.4" "-Phadoop-2.4 $FLAGS" "3034" &
213222
make_binary_release "hadoop2.6" "-Phadoop-2.6 $FLAGS" "3035" &
214-
make_binary_release "hadoop2.7" "-Phadoop-2.7 $FLAGS" "3036" &
223+
make_binary_release "hadoop2.7" "-Phadoop-2.7 $FLAGS" "3036" "withpip" &
215224
make_binary_release "hadoop2.4-without-hive" "-Psparkr -Phadoop-2.4 -Pyarn -Pmesos" "3037" &
216225
make_binary_release "without-hadoop" "-Psparkr -Phadoop-provided -Pyarn -Pmesos" "3038" &
217226
wait

0 commit comments

Comments
 (0)