Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ exportMethods("arrange",
"dropna",
"dtypes",
"except",
"exceptAll",
"explain",
"fillna",
"filter",
Expand All @@ -131,6 +132,7 @@ exportMethods("arrange",
"hint",
"insertInto",
"intersect",
"intersectAll",
"isLocal",
"isStreaming",
"join",
Expand Down
59 changes: 58 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2848,6 +2848,35 @@ setMethod("intersect",
dataFrame(intersected)
})

#' intersectAll
#'
#' Return a new SparkDataFrame containing rows in both this SparkDataFrame
#' and another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the intersect all operation.
#' @family SparkDataFrame functions
#' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method
#' @rdname intersectAll
#' @name intersectAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' intersectAllDF <- intersectAll(df1, df2)
#' }
#' @note intersectAll since 2.4.0
setMethod("intersectAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
intersected <- callJMethod(x@sdf, "intersectAll", y@sdf)
dataFrame(intersected)
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add extra empty line after code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


#' except
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
Expand All @@ -2867,7 +2896,6 @@ setMethod("intersect",
#' df2 <- read.json(path2)
#' exceptDF <- except(df, df2)
#' }
#' @rdname except
#' @note except since 1.4.0
setMethod("except",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
Expand All @@ -2876,6 +2904,35 @@ setMethod("except",
dataFrame(excepted)
})

#' exceptAll
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
#' but not in another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the except all operation.
#' @family SparkDataFrame functions
#' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method
#' @rdname exceptAll
#' @name exceptAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' exceptAllDF <- exceptAll(df1, df2)
#' }
#' @note exceptAll since 2.4.0
setMethod("exceptAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
excepted <- callJMethod(x@sdf, "exceptAll", y@sdf)
dataFrame(excepted)
})

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove one of the two empty lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung Sure.

#' Save the contents of SparkDataFrame to a data source.
#'
#' The data source is specified by the \code{source} and a set of options (...).
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @rdname except
setGeneric("except", function(x, y) { standardGeneric("except") })

#' @rdname exceptAll
setGeneric("exceptAll", function(x, y) { standardGeneric("exceptAll") })

#' @rdname nafunctions
setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })

Expand All @@ -495,6 +498,9 @@ setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertIn
#' @rdname intersect
setGeneric("intersect", function(x, y) { standardGeneric("intersect") })

#' @rdname intersectAll
setGeneric("intersectAll", function(x, y) { standardGeneric("intersectAll") })

#' @rdname isLocal
setGeneric("isLocal", function(x) { standardGeneric("isLocal") })

Expand Down
19 changes: 19 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2482,6 +2482,25 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF
unlink(jsonPath2)
})

test_that("intersectAll() and exceptAll()", {
df1 <- createDataFrame(list(list("a", 1), list("a", 1), list("a", 1),
list("a", 1), list("b", 3), list("c", 4)),
schema = c("a", "b"))
df2 <- createDataFrame(list(list("a", 1), list("a", 1), list("b", 3)), schema = c("a", "b"))
intersectAllExpected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3),
stringsAsFactors = FALSE)
exceptAllExpected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4),
stringsAsFactors = FALSE)
intersectAllDf <- arrange(intersectAll(df1, df2), df1$a)
expect_is(intersectAllDf, "SparkDataFrame")
exceptAllDf <- arrange(exceptAll(df1, df2), df1$a)
expect_is(exceptAllDf, "SparkDataFrame")
intersectAllActual <- collect(intersectAllDf)
expect_identical(intersectAllActual, intersectAllExpected)
exceptAllActual <- collect(exceptAllDf)
expect_identical(exceptAllActual, exceptAllExpected)
})

test_that("withColumn() and withColumnRenamed()", {
df <- read.json(jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2)
Expand Down