Skip to content

Commit a44e63d

Browse files
committed
Merge pull request apache#84 from sun-rui/SPARKR-94
[SPARKR-94] Add a method to get an element of a pair RDD object by key.
2 parents 95beb4e + fbb5663 commit a44e63d

File tree

4 files changed

+75
-0
lines changed

4 files changed

+75
-0
lines changed

pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ exportMethods(
1515
"lapply",
1616
"lapplyPartition",
1717
"lapplyPartitionsWithIndex",
18+
"lookup",
1819
"map",
1920
"mapPartitions",
2021
"mapPartitionsWithIndex",

pkg/R/RDD.R

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,38 @@ setMethod("collectPartition",
291291
})
292292

293293

294+
#' Look up elements of a key in an RDD
295+
#'
296+
#' @description
297+
#' \code{lookup} returns a list of values in this RDD for key key.
298+
#'
299+
#' @param rdd The RDD to collect
300+
#' @param key The key to look up for
301+
#' @return a list of values in this RDD for key key
302+
#' @rdname lookup
303+
#' @export
304+
#' @examples
305+
#'\dontrun{
306+
#' sc <- sparkR.init()
307+
#' pairs <- list(c(1, 1), c(2, 2), c(1, 3))
308+
#' rdd <- parallelize(sc, pairs)
309+
#' lookup(rdd, 1) # list(1, 3)
310+
#'}
311+
setGeneric("lookup", function(rdd, key) { standardGeneric("lookup") })
312+
313+
#' @rdname lookup
314+
#' @aliases lookup,RDD-method
315+
setMethod("lookup",
316+
signature(rdd = "RDD", key = "ANY"),
317+
function(rdd, key) {
318+
partitionFunc <- function(part) {
319+
filtered <- part[unlist(lapply(part, function(x) { identical(key, x[[1]]) }))]
320+
lapply(filtered, function(x) { x[[2]] })
321+
}
322+
valsRDD <- lapplyPartition(rdd, partitionFunc)
323+
collect(valsRDD)
324+
})
325+
294326
#' Return the number of elements in the RDD.
295327
#'
296328
#' @param rdd The RDD to count

pkg/inst/tests/test_rdd.R

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ sc <- sparkR.init()
77
nums <- 1:10
88
rdd <- parallelize(sc, nums, 2L)
99

10+
intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
11+
intRdd <- parallelize(sc, intPairs, 2L)
12+
1013
test_that("count and length on RDD", {
1114
expect_equal(count(rdd), 10)
1215
expect_equal(length(rdd), 10)
@@ -30,6 +33,14 @@ test_that("mapPartitions on RDD", {
3033
expect_equal(actual, list(15, 40))
3134
})
3235

36+
test_that("lookup on RDD", {
37+
vals <- lookup(intRdd, 1L)
38+
expect_equal(vals, list(-1, 200))
39+
40+
vals <- lookup(intRdd, 3L)
41+
expect_equal(vals, list())
42+
})
43+
3344
test_that("several transformations on RDD (a benchmark on PipelinedRDD)", {
3445
rdd2 <- rdd
3546
for (i in 1:12)

pkg/man/lookup.Rd

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{lookup}
4+
\alias{lookup}
5+
\alias{lookup,RDD-method}
6+
\title{Look up elements of a key in an RDD}
7+
\usage{
8+
lookup(rdd, key)
9+
10+
\S4method{lookup}{RDD}(rdd, key)
11+
}
12+
\arguments{
13+
\item{rdd}{The RDD to collect}
14+
15+
\item{key}{The key to look up for}
16+
}
17+
\value{
18+
a list of values in this RDD for key key
19+
}
20+
\description{
21+
\code{lookup} returns a list of values in this RDD for key key.
22+
}
23+
\examples{
24+
\dontrun{
25+
sc <- sparkR.init()
26+
pairs <- list(c(1, 1), c(2, 2), c(1, 3))
27+
rdd <- parallelize(sc, pairs)
28+
lookup(rdd, 1) # list(1, 3)
29+
}
30+
}
31+

0 commit comments

Comments
 (0)