Skip to content

Commit 63e62ed

Browse files
author
Sun Rui
committed
[SPARKR-150] phase 2: implement takeOrdered() and top().
1 parent bd6705b commit 63e62ed

5 files changed

Lines changed: 161 additions & 0 deletions

File tree

pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ exportMethods(
4949
"sortBy",
5050
"sortByKey",
5151
"take",
52+
"takeOrdered",
5253
"takeSample",
54+
"top",
5355
"unionRDD",
5456
"unpersist",
5557
"value",

pkg/R/RDD.R

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,6 +1294,79 @@ setMethod("sortBy",
12941294
values(sortByKey(keyBy(rdd, func), ascending, numPartitions))
12951295
})
12961296

1297+
# Helper function to get first N elements from an RDD in the specified order.
1298+
# Param:
1299+
# rdd An RDD.
1300+
# num Number of elements to return.
1301+
# ascending A flag to indicate whether the sorting is ascending or descending.
1302+
# Return:
1303+
# A list of the first N elements from the RDD in the specified order.
1304+
#
1305+
takeOrderedElem <- function(rdd, num, ascending = TRUE) {
1306+
if (num <= 0L) {
1307+
return(list())
1308+
}
1309+
1310+
partitionFunc <- function(part) {
1311+
if (num < length(part)) {
1312+
# R limitation: order works only on primitive types!
1313+
ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
1314+
part[ord[1:num]]
1315+
} else {
1316+
part
1317+
}
1318+
}
1319+
1320+
newRdd <- mapPartitions(rdd, partitionFunc)
1321+
take(sortBy(newRdd, function(x) { x }, ascending = ascending), num)
1322+
}
1323+
1324+
#' Returns the first N elements from an RDD in ascending order.
1325+
#'
1326+
#' @param rdd An RDD.
1327+
#' @param num Number of elements to return.
1328+
#' @return The first N elements from the RDD in ascending order.
1329+
#' @rdname takeOrdered
1330+
#' @export
1331+
#' @examples
1332+
#'\dontrun{
1333+
#' sc <- sparkR.init()
1334+
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
1335+
#' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
1336+
#'}
1337+
setGeneric("takeOrdered", function(rdd, num) { standardGeneric("takeOrdered") })
1338+
1339+
#' @rdname takeOrdered
1340+
#' @aliases takeOrdered,RDD,RDD-method
1341+
setMethod("takeOrdered",
1342+
signature(rdd = "RDD", num = "integer"),
1343+
function(rdd, num) {
1344+
takeOrderedElem(rdd, num)
1345+
})
1346+
1347+
#' Returns the top N elements from an RDD.
1348+
#'
1349+
#' @param rdd An RDD.
1350+
#' @param num Number of elements to return.
1351+
#' @return The top N elements from the RDD.
1352+
#' @rdname top
1353+
#' @export
1354+
#' @examples
1355+
#'\dontrun{
1356+
#' sc <- sparkR.init()
1357+
#' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
1358+
#' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
1359+
#'}
1360+
setGeneric("top", function(rdd, num) { standardGeneric("top") })
1361+
1362+
#' @rdname top
1363+
#' @aliases top,RDD,RDD-method
1364+
setMethod("top",
1365+
signature(rdd = "RDD", num = "integer"),
1366+
function(rdd, num) {
1367+
takeOrderedElem(rdd, num, FALSE)
1368+
})
1369+
12971370
############ Shuffle Functions ############
12981371

12991372
#' Partition an RDD by key

pkg/inst/tests/test_rdd.R

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,30 @@ test_that("sortBy() on RDDs", {
278278
expect_equal(actual, as.list(nums))
279279
})
280280

281+
test_that("takeOrdered() on RDDs", {
282+
l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
283+
rdd <- parallelize(sc, l)
284+
actual <- takeOrdered(rdd, 6L)
285+
expect_equal(actual, as.list(sort(unlist(l)))[1:6])
286+
287+
l <- list("e", "d", "c", "d", "a")
288+
rdd <- parallelize(sc, l)
289+
actual <- takeOrdered(rdd, 3L)
290+
expect_equal(actual, as.list(sort(unlist(l)))[1:3])
291+
})
292+
293+
test_that("top() on RDDs", {
294+
l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7)
295+
rdd <- parallelize(sc, l)
296+
actual <- top(rdd, 6L)
297+
expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6])
298+
299+
l <- list("e", "d", "c", "d", "a")
300+
rdd <- parallelize(sc, l)
301+
actual <- top(rdd, 3L)
302+
expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3])
303+
})
304+
281305
test_that("keys() on RDDs", {
282306
keys <- keys(intRdd)
283307
actual <- collect(keys)

pkg/man/takeOrdered.Rd

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{takeOrdered}
4+
\alias{takeOrdered}
5+
\alias{takeOrdered,RDD,RDD-method}
6+
\alias{takeOrdered,RDD,integer-method}
7+
\title{Returns the first N elements from an RDD in ascending order.}
8+
\usage{
9+
takeOrdered(rdd, num)
10+
11+
\S4method{takeOrdered}{RDD,integer}(rdd, num)
12+
}
13+
\arguments{
14+
\item{rdd}{An RDD.}
15+
16+
\item{num}{Number of elements to return.}
17+
}
18+
\value{
19+
The first N elements from the RDD in ascending order.
20+
}
21+
\description{
22+
Returns the first N elements from an RDD in ascending order.
23+
}
24+
\examples{
25+
\dontrun{
26+
sc <- sparkR.init()
27+
rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
28+
takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
29+
}
30+
}
31+

pkg/man/top.Rd

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{top}
4+
\alias{top}
5+
\alias{top,RDD,RDD-method}
6+
\alias{top,RDD,integer-method}
7+
\title{Returns the top N elements from an RDD.}
8+
\usage{
9+
top(rdd, num)
10+
11+
\S4method{top}{RDD,integer}(rdd, num)
12+
}
13+
\arguments{
14+
\item{rdd}{An RDD.}
15+
16+
\item{num}{Number of elements to return.}
17+
}
18+
\value{
19+
The top N elements from the RDD.
20+
}
21+
\description{
22+
Returns the top N elements from an RDD.
23+
}
24+
\examples{
25+
\dontrun{
26+
sc <- sparkR.init()
27+
rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
28+
top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
29+
}
30+
}
31+

0 commit comments

Comments
 (0)