From 6076700fb99733447c19f5887ecf43d0f422c7d4 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 14 Aug 2018 11:06:47 -0700 Subject: [PATCH 1/6] [SPARK-25117] Add EXEPT ALL and INTERSECT ALL support in R --- R/pkg/NAMESPACE | 2 + R/pkg/R/DataFrame.R | 60 +++++++++++++++++++++++++++ R/pkg/R/generics.R | 6 +++ R/pkg/tests/fulltests/test_sparkSQL.R | 26 ++++++++++++ 4 files changed, 94 insertions(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index adfd3871f342..0fd08482c441 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -117,6 +117,7 @@ exportMethods("arrange", "dropna", "dtypes", "except", + "exceptAll", "explain", "fillna", "filter", @@ -131,6 +132,7 @@ exportMethods("arrange", "hint", "insertInto", "intersect", + "intersectAll", "isLocal", "isStreaming", "join", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 471ada15d655..a5e018c96b49 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2848,6 +2848,35 @@ setMethod("intersect", dataFrame(intersected) }) +#' intersectAll +#' +#' Return a new SparkDataFrame containing rows only in both this SparkDataFrame +#' and another SparkDataFrame while preserving the duplicates. +#' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in +#' SQL, this function resolves columns by position (not by name). +#' +#' @param x a SparkDataFrame. +#' @param y a SparkDataFrame. +#' @return A SparkDataFrame containing the result of the intersect all operation. +#' @family SparkDataFrame functions +#' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method +#' @rdname intersectAll +#' @name intersectAll +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- read.json(path) +#' df2 <- read.json(path2) +#' intersectAllDF <- intersectAll(df1, df2) +#' } +#' @rdname intersectAll +#' @note intersectAll since 2.4 +setMethod("intersectAll", + signature(x = "SparkDataFrame", y = "SparkDataFrame"), + function(x, y) { + intersected <- callJMethod(x@sdf, "intersectAll", y@sdf) + dataFrame(intersected) + }) #' except #' #' Return a new SparkDataFrame containing rows in this SparkDataFrame @@ -2876,6 +2905,37 @@ setMethod("except", dataFrame(excepted) }) +#' exceptAll +#' +#' Return a new SparkDataFrame containing rows in this SparkDataFrame +#' but not in another SparkDataFrame while preserving the duplicates. +#' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in +#' SQL, this function resolves columns by position (not by name). +#' +#' @param x a SparkDataFrame. +#' @param y a SparkDataFrame. +#' @return A SparkDataFrame containing the result of the except all operation. +#' @family SparkDataFrame functions +#' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method +#' @rdname exceptAll +#' @name exceptAll +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df1 <- read.json(path) +#' df2 <- read.json(path2) +#' exceptAllDF <- exceptAll(df1, df2) +#' } +#' @rdname exceptAll +#' @note exceptAll since 2.4 +setMethod("exceptAll", + signature(x = "SparkDataFrame", y = "SparkDataFrame"), + function(x, y) { + excepted <- callJMethod(x@sdf, "exceptAll", y@sdf) + dataFrame(excepted) + }) + + #' Save the contents of SparkDataFrame to a data source. #' #' The data source is specified by the \code{source} and a set of options (...). diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 4a7210bf1b90..f6f1849787a2 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -471,6 +471,9 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") }) #' @rdname except setGeneric("except", function(x, y) { standardGeneric("except") }) +#' @rdname exceptAll +setGeneric("exceptAll", function(x, y) { standardGeneric("exceptAll") }) + #' @rdname nafunctions setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") }) @@ -495,6 +498,9 @@ setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertIn #' @rdname intersect setGeneric("intersect", function(x, y) { standardGeneric("intersect") }) +#' @rdname intersectAll +setGeneric("intersectAll", function(x, y) { standardGeneric("intersectAll") }) + #' @rdname isLocal setGeneric("isLocal", function(x) { standardGeneric("isLocal") }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index adcbbff823a2..3e341e592ded 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2482,6 +2482,32 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF unlink(jsonPath2) }) +test_that("intersectAll() and exceptAll()", { + df1 <- createDataFrame( + list(list("a", 1), + list("a", 1), + list("a", 1), + list("a", 1), + list("b", 3), + list("c", 4)), + schema = c("a", "b")) + df2 <- createDataFrame( + list(list("a", 1), list("a", 1), list("b", 3)), + schema = c("a", "b")) + intersect_all_expected = data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3), + stringsAsFactors = FALSE) + except_all_expected = data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4), + stringsAsFactors = FALSE) + intersect_all_df <- arrange(intersectAll(df1, df2), df1$a) + expect_is(intersect_all_df, "SparkDataFrame") + except_all_df <- arrange(exceptAll(df1, df2), df1$a) + expect_is(except_all_df, "SparkDataFrame") + intersect_all_actual <- collect(intersect_all_df) + expect_identical(intersect_all_actual, intersect_all_expected) + except_all_actual <- collect(except_all_df) + expect_identical(except_all_actual, except_all_expected) +}) + test_that("withColumn() and withColumnRenamed()", { df <- read.json(jsonPath) newDF <- withColumn(df, "newAge", df$age + 2) From 426ffeda2e31ebe60f777ca4c172fa79e5c45f2f Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 14 Aug 2018 11:11:37 -0700 Subject: [PATCH 2/6] minor fix --- R/pkg/R/DataFrame.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index a5e018c96b49..d3a0726cab24 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2850,7 +2850,7 @@ setMethod("intersect", #' intersectAll #' -#' Return a new SparkDataFrame containing rows only in both this SparkDataFrame +#' Return a new SparkDataFrame containing rows in both this SparkDataFrame #' and another SparkDataFrame while preserving the duplicates. #' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in #' SQL, this function resolves columns by position (not by name). From 7e88c9dd23f0e889f61b6392980363f2b63c0117 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 14 Aug 2018 11:58:39 -0700 Subject: [PATCH 3/6] lint --- R/pkg/tests/fulltests/test_sparkSQL.R | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 3e341e592ded..fb748d73e3cd 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2494,10 +2494,10 @@ test_that("intersectAll() and exceptAll()", { df2 <- createDataFrame( list(list("a", 1), list("a", 1), list("b", 3)), schema = c("a", "b")) - intersect_all_expected = data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3), - stringsAsFactors = FALSE) - except_all_expected = data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4), - stringsAsFactors = FALSE) + intersect_all_expected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3), + stringsAsFactors = FALSE) + except_all_expected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4), + stringsAsFactors = FALSE) intersect_all_df <- arrange(intersectAll(df1, df2), df1$a) expect_is(intersect_all_df, "SparkDataFrame") except_all_df <- arrange(exceptAll(df1, df2), df1$a) From 5247ab5ef79c7d28db5298aea45b7dcad5ec8ab8 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 14 Aug 2018 18:05:22 -0700 Subject: [PATCH 4/6] code review --- R/pkg/R/DataFrame.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index d3a0726cab24..336b495e12d0 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2870,7 +2870,7 @@ setMethod("intersect", #' intersectAllDF <- intersectAll(df1, df2) #' } #' @rdname intersectAll -#' @note intersectAll since 2.4 +#' @note intersectAll since 2.4.0 setMethod("intersectAll", signature(x = "SparkDataFrame", y = "SparkDataFrame"), function(x, y) { @@ -2927,7 +2927,7 @@ setMethod("except", #' exceptAllDF <- exceptAll(df1, df2) #' } #' @rdname exceptAll -#' @note exceptAll since 2.4 +#' @note exceptAll since 2.4.0 setMethod("exceptAll", signature(x = "SparkDataFrame", y = "SparkDataFrame"), function(x, y) { From 1d93304290909617c1ddb794f3599907d09cad3d Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 14 Aug 2018 23:43:03 -0700 Subject: [PATCH 5/6] code review --- R/pkg/tests/fulltests/test_sparkSQL.R | 35 +++++++++++---------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index fb748d73e3cd..bff6e3512ee2 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -2483,29 +2483,22 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF }) test_that("intersectAll() and exceptAll()", { - df1 <- createDataFrame( - list(list("a", 1), - list("a", 1), - list("a", 1), - list("a", 1), - list("b", 3), - list("c", 4)), - schema = c("a", "b")) - df2 <- createDataFrame( - list(list("a", 1), list("a", 1), list("b", 3)), - schema = c("a", "b")) - intersect_all_expected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3), + df1 <- createDataFrame(list(list("a", 1), list("a", 1), list("a", 1), + list("a", 1), list("b", 3), list("c", 4)), + schema = c("a", "b")) + df2 <- createDataFrame(list(list("a", 1), list("a", 1), list("b", 3)), schema = c("a", "b")) + intersectAllExpected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3), stringsAsFactors = FALSE) - except_all_expected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4), + exceptAllExpected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4), stringsAsFactors = FALSE) - intersect_all_df <- arrange(intersectAll(df1, df2), df1$a) - expect_is(intersect_all_df, "SparkDataFrame") - except_all_df <- arrange(exceptAll(df1, df2), df1$a) - expect_is(except_all_df, "SparkDataFrame") - intersect_all_actual <- collect(intersect_all_df) - expect_identical(intersect_all_actual, intersect_all_expected) - except_all_actual <- collect(except_all_df) - expect_identical(except_all_actual, except_all_expected) + intersectAllDf <- arrange(intersectAll(df1, df2), df1$a) + expect_is(intersectAllDf, "SparkDataFrame") + exceptAllDf <- arrange(exceptAll(df1, df2), df1$a) + expect_is(exceptAllDf, "SparkDataFrame") + intersectAllActual <- collect(intersectAllDf) + expect_identical(intersectAllActual, intersectAllExpected) + exceptAllActual <- collect(exceptAllDf) + expect_identical(exceptAllActual, exceptAllExpected) }) test_that("withColumn() and withColumnRenamed()", { From 528050d14f16a5a44b552c44d49aa733584c21d6 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Thu, 16 Aug 2018 00:24:03 -0700 Subject: [PATCH 6/6] code review --- R/pkg/R/DataFrame.R | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 336b495e12d0..4f2d4c7c002d 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -2869,7 +2869,6 @@ setMethod("intersect", #' df2 <- read.json(path2) #' intersectAllDF <- intersectAll(df1, df2) #' } -#' @rdname intersectAll #' @note intersectAll since 2.4.0 setMethod("intersectAll", signature(x = "SparkDataFrame", y = "SparkDataFrame"), @@ -2877,6 +2876,7 @@ setMethod("intersectAll", intersected <- callJMethod(x@sdf, "intersectAll", y@sdf) dataFrame(intersected) }) + #' except #' #' Return a new SparkDataFrame containing rows in this SparkDataFrame @@ -2896,7 +2896,6 @@ setMethod("intersectAll", #' df2 <- read.json(path2) #' exceptDF <- except(df, df2) #' } -#' @rdname except #' @note except since 1.4.0 setMethod("except", signature(x = "SparkDataFrame", y = "SparkDataFrame"), @@ -2926,7 +2925,6 @@ setMethod("except", #' df2 <- read.json(path2) #' exceptAllDF <- exceptAll(df1, df2) #' } -#' @rdname exceptAll #' @note exceptAll since 2.4.0 setMethod("exceptAll", signature(x = "SparkDataFrame", y = "SparkDataFrame"), @@ -2935,7 +2933,6 @@ setMethod("exceptAll", dataFrame(excepted) }) - #' Save the contents of SparkDataFrame to a data source. #' #' The data source is specified by the \code{source} and a set of options (...).