Skip to content

Commit 40338a4

Browse files
concretevitaminDavies Liu
authored andcommitted
Merge pull request apache#244 from sun-rui/SPARKR-154_5
[SPARKR-154] Phase 4: implement subtract() and subtractByKey().
1 parent 20b97a6 commit 40338a4

7 files changed

Lines changed: 210 additions & 3 deletions

File tree

R/pkg/NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ exportMethods(
5959
"saveAsObjectFile",
6060
"sortBy",
6161
"sortByKey",
62+
"subtract",
63+
"subtractByKey",
6264
"sumRDD",
6365
"take",
6466
"takeOrdered",

R/pkg/R/RDD.R

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,6 +1561,34 @@ setMethod("zipRDD",
15611561
PipelinedRDD(zippedRDD, partitionFunc)
15621562
})
15631563

1564+
#' Subtract an RDD with another RDD.
1565+
#'
1566+
#' Return an RDD with the elements from this that are not in other.
1567+
#'
1568+
#' @param x An RDD.
1569+
#' @param other An RDD.
1570+
#' @param numPartitions Number of the partitions in the result RDD.
1571+
#' @return An RDD with the elements from this that are not in other.
1572+
#' @examples
1573+
#'\dontrun{
1574+
#' sc <- sparkR.init()
1575+
#' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
1576+
#' rdd2 <- parallelize(sc, list(2, 4))
1577+
#' collect(subtract(rdd1, rdd2))
1578+
#' # list(1, 1, 3)
1579+
#'}
1580+
#' @rdname subtract
1581+
#' @aliases subtract,RDD
1582+
setMethod("subtract",
1583+
signature(x = "RDD", other = "RDD"),
1584+
function(x, other, numPartitions = SparkR::numPartitions(x)) {
1585+
mapFunction <- function(e) { list(e, NA) }
1586+
rdd1 <- map(x, mapFunction)
1587+
rdd2 <- map(other, mapFunction)
1588+
1589+
keys(subtractByKey(rdd1, rdd2, numPartitions))
1590+
})
1591+
15641592
#' Intersection of this RDD and another one.
15651593
#'
15661594
#' Return the intersection of this RDD and another one.

R/pkg/R/generics.R

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,10 +344,24 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri
344344

345345
#' @rdname sortByKey
346346
#' @export
347-
setGeneric("sortByKey", function(x, ascending = TRUE, numPartitions = 1L) {
348-
standardGeneric("sortByKey")
349-
})
347+
setGeneric("sortByKey",
348+
function(x, ascending = TRUE, numPartitions = 1L) {
349+
standardGeneric("sortByKey")
350+
})
351+
352+
#' @rdname subtractByKey
353+
#' @export
354+
setGeneric("subtractByKey",
355+
function(x, other, numPartitions = 1L) {
356+
standardGeneric("subtractByKey")
357+
})
350358

359+
#' @rdname subtract
360+
#' @export
361+
setGeneric("subtract",
362+
function(x, other, numPartitions = 1L) {
363+
standardGeneric("subtract")
364+
})
351365

352366
################### Broadcast Variable Methods #################
353367

R/pkg/R/pairRDD.R

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,40 @@ setMethod("sortByKey",
786786
newRDD <- partitionBy(x, numPartitions, rangePartitionFunc)
787787
lapplyPartition(newRDD, partitionFunc)
788788
})
789+
790+
#' Subtract a pair RDD with another pair RDD.
791+
#'
792+
#' Return an RDD with the pairs from x whose keys are not in other.
793+
#'
794+
#' @param x An RDD.
795+
#' @param other An RDD.
796+
#' @param numPartitions Number of the partitions in the result RDD.
797+
#' @return An RDD with the pairs from x whose keys are not in other.
798+
#' @examples
799+
#'\dontrun{
800+
#' sc <- sparkR.init()
801+
#' rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
802+
#' list("b", 5), list("a", 2)))
803+
#' rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
804+
#' collect(subtractByKey(rdd1, rdd2))
805+
#' # list(list("b", 4), list("b", 5))
806+
#'}
807+
#' @rdname subtractByKey
808+
#' @aliases subtractByKey,RDD
809+
setMethod("subtractByKey",
810+
signature(x = "RDD", other = "RDD"),
811+
function(x, other, numPartitions = SparkR::numPartitions(x)) {
812+
filterFunction <- function(elem) {
813+
iters <- elem[[2]]
814+
(length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)
815+
}
816+
817+
flatMapValues(filterRDD(cogroup(x,
818+
other,
819+
numPartitions = numPartitions),
820+
filterFunction),
821+
function (v) { v[[1]] })
822+
})
789823

790824
#' @description
791825
#' \code{sampleByKey} return a subset RDD of the given RDD sampled by key

R/pkg/inst/tests/test_rdd.R

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,62 @@ test_that("zipRDD() on RDDs", {
468468
unlink(fileName)
469469
})
470470

471+
test_that("subtract() on RDDs", {
472+
l <- list(1, 1, 2, 2, 3, 4)
473+
rdd1 <- parallelize(sc, l)
474+
475+
# subtract by itself
476+
actual <- collect(subtract(rdd1, rdd1))
477+
expect_equal(actual, list())
478+
479+
# subtract by an empty RDD
480+
rdd2 <- parallelize(sc, list())
481+
actual <- collect(subtract(rdd1, rdd2))
482+
expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
483+
l)
484+
485+
rdd2 <- parallelize(sc, list(2, 4))
486+
actual <- collect(subtract(rdd1, rdd2))
487+
expect_equal(as.list(sort(as.vector(actual, mode="integer"))),
488+
list(1, 1, 3))
489+
490+
l <- list("a", "a", "b", "b", "c", "d")
491+
rdd1 <- parallelize(sc, l)
492+
rdd2 <- parallelize(sc, list("b", "d"))
493+
actual <- collect(subtract(rdd1, rdd2))
494+
expect_equal(as.list(sort(as.vector(actual, mode="character"))),
495+
list("a", "a", "c"))
496+
})
497+
498+
test_that("subtractByKey() on pairwise RDDs", {
499+
l <- list(list("a", 1), list("b", 4),
500+
list("b", 5), list("a", 2))
501+
rdd1 <- parallelize(sc, l)
502+
503+
# subtractByKey by itself
504+
actual <- collect(subtractByKey(rdd1, rdd1))
505+
expect_equal(actual, list())
506+
507+
# subtractByKey by an empty RDD
508+
rdd2 <- parallelize(sc, list())
509+
actual <- collect(subtractByKey(rdd1, rdd2))
510+
expect_equal(sortKeyValueList(actual),
511+
sortKeyValueList(l))
512+
513+
rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
514+
actual <- collect(subtractByKey(rdd1, rdd2))
515+
expect_equal(actual,
516+
list(list("b", 4), list("b", 5)))
517+
518+
l <- list(list(1, 1), list(2, 4),
519+
list(2, 5), list(1, 2))
520+
rdd1 <- parallelize(sc, l)
521+
rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1)))
522+
actual <- collect(subtractByKey(rdd1, rdd2))
523+
expect_equal(actual,
524+
list(list(2, 4), list(2, 5)))
525+
})
526+
471527
test_that("intersection() on RDDs", {
472528
# intersection with self
473529
actual <- collect(intersection(rdd, rdd))

pkg/man/subtract.Rd

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{subtract,RDD,RDD-method}
4+
\alias{subtract}
5+
\alias{subtract,RDD}
6+
\alias{subtract,RDD,RDD-method}
7+
\title{Subtract an RDD with another RDD.}
8+
\usage{
9+
\S4method{subtract}{RDD,RDD}(x, other,
10+
numPartitions = SparkR::numPartitions(x))
11+
12+
subtract(x, other, numPartitions = 1L)
13+
}
14+
\arguments{
15+
\item{x}{An RDD.}
16+
17+
\item{other}{An RDD.}
18+
19+
\item{numPartitions}{Number of the partitions in the result RDD.}
20+
}
21+
\value{
22+
An RDD with the elements from this that are not in other.
23+
}
24+
\description{
25+
Return an RDD with the elements from this that are not in other.
26+
}
27+
\examples{
28+
\dontrun{
29+
sc <- sparkR.init()
30+
rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
31+
rdd2 <- parallelize(sc, list(2, 4))
32+
collect(subtract(rdd1, rdd2))
33+
# list(1, 1, 3)
34+
}
35+
}
36+

pkg/man/subtractByKey.Rd

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
% Generated by roxygen2 (4.0.2): do not edit by hand
2+
\docType{methods}
3+
\name{subtractByKey}
4+
\alias{subtractByKey}
5+
\alias{subtractByKey,RDD}
6+
\alias{subtractByKey,RDD,RDD-method}
7+
\title{Subtract a pair RDD with another pair RDD.}
8+
\usage{
9+
subtractByKey(x, other, numPartitions = 1L)
10+
11+
\S4method{subtractByKey}{RDD,RDD}(x, other,
12+
numPartitions = SparkR::numPartitions(x))
13+
}
14+
\arguments{
15+
\item{x}{An RDD.}
16+
17+
\item{other}{An RDD.}
18+
19+
\item{numPartitions}{Number of the partitions in the result RDD.}
20+
}
21+
\value{
22+
An RDD with the pairs from x whose keys are not in other.
23+
}
24+
\description{
25+
Return an RDD with the pairs from x whose keys are not in other.
26+
}
27+
\examples{
28+
\dontrun{
29+
sc <- sparkR.init()
30+
rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
31+
list("b", 5), list("a", 2)))
32+
rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
33+
collect(subtractByKey(rdd1, rdd2))
34+
# list(list("b", 4), list("b", 5))
35+
}
36+
}
37+

0 commit comments

Comments
 (0)