Skip to content

Commit 9129f72

Browse files
committed
Merge remote-tracking branch 'apache/master' into ratefix
2 parents 2fec90b + b348901 commit 9129f72

12 files changed

Lines changed: 192 additions & 14 deletions

File tree

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ exportMethods("arrange",
151151
"registerTempTable",
152152
"rename",
153153
"repartition",
154+
"repartitionByRange",
154155
"rollup",
155156
"sample",
156157
"sample_frac",

R/pkg/R/DataFrame.R

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ setMethod("storageLevel",
687687
#' @rdname coalesce
688688
#' @name coalesce
689689
#' @aliases coalesce,SparkDataFrame-method
690-
#' @seealso \link{repartition}
690+
#' @seealso \link{repartition}, \link{repartitionByRange}
691691
#' @examples
692692
#'\dontrun{
693693
#' sparkR.session()
@@ -723,7 +723,7 @@ setMethod("coalesce",
723723
#' @rdname repartition
724724
#' @name repartition
725725
#' @aliases repartition,SparkDataFrame-method
726-
#' @seealso \link{coalesce}
726+
#' @seealso \link{coalesce}, \link{repartitionByRange}
727727
#' @examples
728728
#'\dontrun{
729729
#' sparkR.session()
@@ -759,6 +759,67 @@ setMethod("repartition",
759759
dataFrame(sdf)
760760
})
761761

762+
763+
#' Repartition by range
764+
#'
765+
#' The following options for repartition by range are possible:
766+
#' \itemize{
767+
#' \item{1.} {Return a new SparkDataFrame range partitioned by
768+
#' the given columns into \code{numPartitions}.}
769+
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
770+
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
771+
#'}
772+
#'
773+
#' @param x a SparkDataFrame.
774+
#' @param numPartitions the number of partitions to use.
775+
#' @param col the column by which the range partitioning will be performed.
776+
#' @param ... additional column(s) to be used in the range partitioning.
777+
#'
778+
#' @family SparkDataFrame functions
779+
#' @rdname repartitionByRange
780+
#' @name repartitionByRange
781+
#' @aliases repartitionByRange,SparkDataFrame-method
782+
#' @seealso \link{repartition}, \link{coalesce}
783+
#' @examples
784+
#'\dontrun{
785+
#' sparkR.session()
786+
#' path <- "path/to/file.json"
787+
#' df <- read.json(path)
788+
#' newDF <- repartitionByRange(df, col = df$col1, df$col2)
789+
#' newDF <- repartitionByRange(df, 3L, col = df$col1, df$col2)
790+
#'}
791+
#' @note repartitionByRange since 2.4.0
792+
setMethod("repartitionByRange",
793+
signature(x = "SparkDataFrame"),
794+
function(x, numPartitions = NULL, col = NULL, ...) {
795+
if (!is.null(numPartitions) && !is.null(col)) {
796+
# number of partitions and columns both are specified
797+
if (is.numeric(numPartitions) && class(col) == "Column") {
798+
cols <- list(col, ...)
799+
jcol <- lapply(cols, function(c) { c@jc })
800+
sdf <- callJMethod(x@sdf, "repartitionByRange", numToInt(numPartitions), jcol)
801+
} else {
802+
stop(paste("numPartitions and col must be numeric and Column; however, got",
803+
class(numPartitions), "and", class(col)))
804+
}
805+
} else if (!is.null(col)) {
806+
# only columns are specified
807+
if (class(col) == "Column") {
808+
cols <- list(col, ...)
809+
jcol <- lapply(cols, function(c) { c@jc })
810+
sdf <- callJMethod(x@sdf, "repartitionByRange", jcol)
811+
} else {
812+
stop(paste("col must be Column; however, got", class(col)))
813+
}
814+
} else if (!is.null(numPartitions)) {
815+
# only numPartitions is specified
816+
stop("At least one partition-by column must be specified.")
817+
} else {
818+
stop("Please, specify a column(s) or the number of partitions with a column(s)")
819+
}
820+
dataFrame(sdf)
821+
})
822+
762823
#' toJSON
763824
#'
764825
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.

R/pkg/R/generics.R

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,9 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
531531
#' @rdname repartition
532532
setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })
533533

534+
#' @rdname repartitionByRange
535+
setGeneric("repartitionByRange", function(x, ...) { standardGeneric("repartitionByRange") })
536+
534537
#' @rdname sample
535538
setGeneric("sample",
536539
function(x, withReplacement = FALSE, fraction, seed) {

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3104,6 +3104,51 @@ test_that("repartition by columns on DataFrame", {
31043104
})
31053105
})
31063106

3107+
test_that("repartitionByRange on a DataFrame", {
3108+
# The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
3109+
# partitions to reduce the number of the tasks to speed up the test. This is particularly
3110+
# slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
3111+
conf <- callJMethod(sparkSession, "conf")
3112+
shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
3113+
callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
3114+
tryCatch({
3115+
df <- createDataFrame(mtcars)
3116+
expect_error(repartitionByRange(df, "haha", df$mpg),
3117+
"numPartitions and col must be numeric and Column.*")
3118+
expect_error(repartitionByRange(df),
3119+
".*specify a column.*or the number of partitions with a column.*")
3120+
expect_error(repartitionByRange(df, col = "haha"),
3121+
"col must be Column; however, got.*")
3122+
expect_error(repartitionByRange(df, 3),
3123+
"At least one partition-by column must be specified.")
3124+
3125+
# The order of rows should be different with a normal repartition.
3126+
actual <- repartitionByRange(df, 3, df$mpg)
3127+
expect_equal(getNumPartitions(actual), 3)
3128+
expect_false(identical(collect(actual), collect(repartition(df, 3, df$mpg))))
3129+
3130+
actual <- repartitionByRange(df, col = df$mpg)
3131+
expect_false(identical(collect(actual), collect(repartition(df, col = df$mpg))))
3132+
3133+
# They should have same data.
3134+
actual <- collect(repartitionByRange(df, 3, df$mpg))
3135+
actual <- actual[order(actual$mpg), ]
3136+
expected <- collect(repartition(df, 3, df$mpg))
3137+
expected <- expected[order(expected$mpg), ]
3138+
expect_true(all(actual == expected))
3139+
3140+
actual <- collect(repartitionByRange(df, col = df$mpg))
3141+
actual <- actual[order(actual$mpg), ]
3142+
expected <- collect(repartition(df, col = df$mpg))
3143+
expected <- expected[order(expected$mpg), ]
3144+
expect_true(all(actual == expected))
3145+
},
3146+
finally = {
3147+
# Resetting the conf back to default value
3148+
callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
3149+
})
3150+
})
3151+
31073152
test_that("coalesce, repartition, numPartitions", {
31083153
df <- as.DataFrame(cars, numPartitions = 5)
31093154
expect_equal(getNumPartitions(df), 5)

core/src/main/scala/org/apache/spark/deploy/security/HiveDelegationTokenProvider.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.internal.Logging
3636
import org.apache.spark.internal.config.KEYTAB
3737
import org.apache.spark.util.Utils
3838

39-
private[security] class HiveDelegationTokenProvider
39+
private[spark] class HiveDelegationTokenProvider
4040
extends HadoopDelegationTokenProvider with Logging {
4141

4242
override def serviceName: String = "hive"
@@ -124,9 +124,9 @@ private[security] class HiveDelegationTokenProvider
124124
val currentUser = UserGroupInformation.getCurrentUser()
125125
val realUser = Option(currentUser.getRealUser()).getOrElse(currentUser)
126126

127-
// For some reason the Scala-generated anonymous class ends up causing an
128-
// UndeclaredThrowableException, even if you annotate the method with @throws.
129-
try {
127+
// For some reason the Scala-generated anonymous class ends up causing an
128+
// UndeclaredThrowableException, even if you annotate the method with @throws.
129+
try {
130130
realUser.doAs(new PrivilegedExceptionAction[T]() {
131131
override def run(): T = fn
132132
})

core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ private[spark] abstract class LauncherBackend {
6767
}
6868

6969
def setAppId(appId: String): Unit = {
70-
if (connection != null) {
70+
if (connection != null && isConnected) {
7171
connection.send(new SetAppId(appId))
7272
}
7373
}
7474

7575
def setState(state: SparkAppHandle.State): Unit = {
76-
if (connection != null && lastState != state) {
76+
if (connection != null && isConnected && lastState != state) {
7777
connection.send(new SetState(state))
7878
lastState = state
7979
}
@@ -114,10 +114,10 @@ private[spark] abstract class LauncherBackend {
114114

115115
override def close(): Unit = {
116116
try {
117+
_isConnected = false
117118
super.close()
118119
} finally {
119120
onDisconnected()
120-
_isConnected = false
121121
}
122122
}
123123

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,15 @@ class BlockManagerMasterEndpoint(
192192
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
193193
removeFromDriver || !info.blockManagerId.isDriver
194194
}
195-
Future.sequence(
196-
requiredBlockManagers.map { bm =>
197-
bm.slaveEndpoint.ask[Int](removeMsg)
198-
}.toSeq
199-
)
195+
val futures = requiredBlockManagers.map { bm =>
196+
bm.slaveEndpoint.ask[Int](removeMsg).recover {
197+
case e: IOException =>
198+
logWarning(s"Error trying to remove broadcast $broadcastId", e)
199+
0 // zero blocks were removed
200+
}
201+
}.toSeq
202+
203+
Future.sequence(futures)
200204
}
201205

202206
private def removeBlockManager(blockManagerId: BlockManagerId) {

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ private[spark] object UIUtils extends Logging {
224224
{commonHeaderNodes}
225225
{if (showVisualization) vizHeaderNodes else Seq.empty}
226226
{if (useDataTables) dataTablesHeaderNodes else Seq.empty}
227+
<link rel="shortcut icon" href={prependBaseUri("/static/spark-logo-77x50px-hd.png")}></link>
227228
<title>{appName} - {title}</title>
228229
</head>
229230
<body>
@@ -265,6 +266,7 @@ private[spark] object UIUtils extends Logging {
265266
<head>
266267
{commonHeaderNodes}
267268
{if (useDataTables) dataTablesHeaderNodes else Seq.empty}
269+
<link rel="shortcut icon" href={prependBaseUri("/static/spark-logo-77x50px-hd.png")}></link>
268270
<title>{title}</title>
269271
</head>
270272
<body>

launcher/src/test/java/org/apache/spark/launcher/LauncherServerSuite.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,26 @@ public void testStreamFiltering() throws Exception {
185185
}
186186
}
187187

188+
@Test
189+
public void testAppHandleDisconnect() throws Exception {
190+
LauncherServer server = LauncherServer.getOrCreateServer();
191+
ChildProcAppHandle handle = new ChildProcAppHandle(server);
192+
String secret = server.registerHandle(handle);
193+
194+
TestClient client = null;
195+
try {
196+
Socket s = new Socket(InetAddress.getLoopbackAddress(), server.getPort());
197+
client = new TestClient(s);
198+
client.send(new Hello(secret, "1.4.0"));
199+
handle.disconnect();
200+
waitForError(client, secret);
201+
} finally {
202+
handle.kill();
203+
close(client);
204+
client.clientThread.join();
205+
}
206+
}
207+
188208
private void close(Closeable c) {
189209
if (c != null) {
190210
try {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.test
19+
20+
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.SparkSession
22+
23+
class TestSparkSessionSuite extends SparkFunSuite {
24+
test("default session is set in constructor") {
25+
val session = new TestSparkSession()
26+
assert(SparkSession.getDefaultSession.contains(session))
27+
session.stop()
28+
}
29+
}

0 commit comments

Comments
 (0)