From 10feb9d2e9e4d027f874e6f508e7e7421c95cd6d Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 6 Mar 2017 20:15:45 +0900 Subject: [PATCH 1/5] Support array in from_json in R --- R/pkg/R/functions.R | 18 ++++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 68 +++++++++++++++-------- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index edf2bcf8fdb3..2d3d7c5936e4 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2430,20 +2430,23 @@ setMethod("date_format", signature(y = "Column", x = "character"), column(jc) }) +setClassUnion("characterOrstructType", c("character", "structType")) + #' from_json #' #' Parses a column containing a JSON string into a Column of \code{structType} with the specified #' \code{schema}. If the string is unparseable, the Column will contains the value NA. #' #' @param x Column containing the JSON string. -#' @param schema a structType object to use as the schema to use when parsing the JSON string. +#' @param schema a structType object or the data type string representing an array or struct type +#' used in structField to use as the schema to use when parsing the JSON string. #' @param ... additional named properties to control how the json is parsed, accepts the same #' options as the JSON data source. #' #' @family normal_funcs #' @rdname from_json #' @name from_json -#' @aliases from_json,Column,structType-method +#' @aliases from_json,Column,characterOrstructType-method #' @export #' @examples #' \dontrun{ @@ -2451,12 +2454,19 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' select(df, from_json(df$value, schema, dateFormat = "dd/MM/yyyy")) #'} #' @note from_json since 2.2.0 -setMethod("from_json", signature(x = "Column", schema = "structType"), +setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"), function(x, schema, ...) { + if (is.character(schema)) { + jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + "getSQLDataType", + schema) + } else { + jschema <- schema$jobj + } options <- varargsToStrEnv(...) jc <- callJStatic("org.apache.spark.sql.functions", "from_json", - x@jc, schema$jobj, options) + x@jc, jschema, options) column(jc) }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 7c096597fea6..a4b7599cbb63 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1342,28 +1342,52 @@ test_that("column functions", { df <- read.json(mapTypeJsonPath) j <- collect(select(df, alias(to_json(df$info), "json"))) expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}") - df <- as.DataFrame(j) - schema <- structType(structField("age", "integer"), - structField("height", "double")) - s <- collect(select(df, alias(from_json(df$json, schema), "structcol"))) - expect_equal(ncol(s), 1) - expect_equal(nrow(s), 3) - expect_is(s[[1]][[1]], "struct") - expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } ))) - - # passing option - df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) - schema2 <- structType(structField("date", "date")) - expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), - error = function(e) { stop(e) }), - paste0(".*(java.lang.NumberFormatException: For input string:).*")) - s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy"))) - expect_is(s[[1]][[1]]$date, "Date") - expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21") - - # check for unparseable - df <- as.DataFrame(list(list("a" = ""))) - expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA) + + schemas <- list(structType(structField("age", "integer"), structField("height", "double")), + "struct") + for (schema in schemas) { + df <- as.DataFrame(j) + s <- collect(select(df, alias(from_json(df$json, schema), "structcol"))) + expect_equal(ncol(s), 1) + expect_equal(nrow(s), 3) + expect_is(s[[1]][[1]], "struct") + expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } ))) + + # passing option + df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) + schema2 <- structType(structField("date", "date")) + expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), + error = function(e) { stop(e) }), + paste0(".*(java.lang.NumberFormatException: For input string:).*")) + s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy"))) + expect_is(s[[1]][[1]]$date, "Date") + expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21") + + # check for unparseable + df <- as.DataFrame(list(list("a" = ""))) + expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA) + } + + # check if array type in string is correctly supported. + jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]" + df <- as.DataFrame(list(list("people" = jsonArr))) + arr <- collect(select(df, alias(from_json(df$people, "array>"), "arrcol"))) + expect_equal(ncol(arr), 1) + expect_equal(nrow(arr), 1) + expect_is(arr[[1]][[1]], "list") + expect_equal(length(arr$arrcol[[1]]), 2) + expect_equal(arr$arrcol[[1]][[1]]$name, "Bob") + expect_equal(arr$arrcol[[1]][[2]]$name, "Alice") + + # check for unparseable data type + expect_error(tryCatch(collect(select(df, from_json(df$people, "unknown"))), + error = function(e) { stop(e) }), + paste0(".*(Invalid type unknown).*")) + + # check for incorrect data type + expect_error(tryCatch(collect(select(df, from_json(df$people, "integer"))), + error = function(e) { stop(e) }), + paste0(".*(data type mismatch: Input schema int must be a struct or an array of structs).*")) }) test_that("column binary mathfunctions", { From f13ad187562e3c5a2c05d2c2aa26849f236593ed Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 11 Mar 2017 02:07:29 +0900 Subject: [PATCH 2/5] Add asArray option for from_json --- R/pkg/R/functions.R | 18 +++--- R/pkg/inst/tests/testthat/test_sparkSQL.R | 59 ++++++++----------- .../org/apache/spark/sql/api/r/SQLUtils.scala | 6 +- 3 files changed, 37 insertions(+), 46 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 2d3d7c5936e4..147d6a8a8c77 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2430,23 +2430,21 @@ setMethod("date_format", signature(y = "Column", x = "character"), column(jc) }) -setClassUnion("characterOrstructType", c("character", "structType")) - #' from_json #' #' Parses a column containing a JSON string into a Column of \code{structType} with the specified #' \code{schema}. If the string is unparseable, the Column will contains the value NA. #' #' @param x Column containing the JSON string. -#' @param schema a structType object or the data type string representing an array or struct type -#' used in structField to use as the schema to use when parsing the JSON string. +#' @param schema a structType object to use as the schema to use when parsing the JSON string. +#' @param asArray indicating if input string is JSON array or object. #' @param ... additional named properties to control how the json is parsed, accepts the same #' options as the JSON data source. #' #' @family normal_funcs #' @rdname from_json #' @name from_json -#' @aliases from_json,Column,characterOrstructType-method +#' @aliases from_json,Column,structType-method #' @export #' @examples #' \dontrun{ @@ -2454,12 +2452,12 @@ setClassUnion("characterOrstructType", c("character", "structType")) #' select(df, from_json(df$value, schema, dateFormat = "dd/MM/yyyy")) #'} #' @note from_json since 2.2.0 -setMethod("from_json", signature(x = "Column", schema = "characterOrstructType"), - function(x, schema, ...) { - if (is.character(schema)) { +setMethod("from_json", signature(x = "Column", schema = "structType"), + function(x, schema, asArray = FALSE, ...) { + if (asArray) { jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", - "getSQLDataType", - schema) + "createArrayType", + schema$jobj) } else { jschema <- schema$jobj } diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index a4b7599cbb63..ec726c823912 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1343,51 +1343,40 @@ test_that("column functions", { j <- collect(select(df, alias(to_json(df$info), "json"))) expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}") - schemas <- list(structType(structField("age", "integer"), structField("height", "double")), - "struct") - for (schema in schemas) { - df <- as.DataFrame(j) - s <- collect(select(df, alias(from_json(df$json, schema), "structcol"))) - expect_equal(ncol(s), 1) - expect_equal(nrow(s), 3) - expect_is(s[[1]][[1]], "struct") - expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } ))) - - # passing option - df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) - schema2 <- structType(structField("date", "date")) - expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), - error = function(e) { stop(e) }), - paste0(".*(java.lang.NumberFormatException: For input string:).*")) - s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy"))) - expect_is(s[[1]][[1]]$date, "Date") - expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21") - - # check for unparseable - df <- as.DataFrame(list(list("a" = ""))) - expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA) - } + df <- as.DataFrame(j) + schema <- structType(structField("age", "integer"), + structField("height", "double")) + s <- collect(select(df, alias(from_json(df$json, schema), "structcol"))) + expect_equal(ncol(s), 1) + expect_equal(nrow(s), 3) + expect_is(s[[1]][[1]], "struct") + expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 } ))) + + # passing option + df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) + schema2 <- structType(structField("date", "date")) + expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), + error = function(e) { stop(e) }), + paste0(".*(java.lang.NumberFormatException: For input string:).*")) + s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy"))) + expect_is(s[[1]][[1]]$date, "Date") + expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21") + + # check for unparseable + df <- as.DataFrame(list(list("a" = ""))) + expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA) # check if array type in string is correctly supported. jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]" df <- as.DataFrame(list(list("people" = jsonArr))) - arr <- collect(select(df, alias(from_json(df$people, "array>"), "arrcol"))) + schema <- structType(structField("name", "string")) + arr <- collect(select(df, alias(from_json(df$people, schema, asArray = TRUE), "arrcol"))) expect_equal(ncol(arr), 1) expect_equal(nrow(arr), 1) expect_is(arr[[1]][[1]], "list") expect_equal(length(arr$arrcol[[1]]), 2) expect_equal(arr$arrcol[[1]][[1]]$name, "Bob") expect_equal(arr$arrcol[[1]][[2]]$name, "Alice") - - # check for unparseable data type - expect_error(tryCatch(collect(select(df, from_json(df$people, "unknown"))), - error = function(e) { stop(e) }), - paste0(".*(Invalid type unknown).*")) - - # check for incorrect data type - expect_error(tryCatch(collect(select(df, from_json(df$people, "integer"))), - error = function(e) { stop(e) }), - paste0(".*(data type mismatch: Input schema int must be a struct or an array of structs).*")) }) test_that("column binary mathfunctions", { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index a4c5bf756cd5..2fe8f169ccd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -81,10 +81,14 @@ private[sql] object SQLUtils extends Logging { new JavaSparkContext(spark.sparkContext) } - def createStructType(fields : Seq[StructField]): StructType = { + def createStructType(fields: Seq[StructField]): StructType = { StructType(fields) } + def createArrayType(dataType: DataType): ArrayType = { + ArrayType(elementType = dataType) + } + // Support using regex in string interpolation private[this] implicit class RegexContext(sc: StringContext) { def r: Regex = new Regex(sc.parts.mkString, sc.parts.tail.map(_ => "x"): _*) From 5c2450d0fbe1345df0472714dd3155a12db303a1 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Sat, 11 Mar 2017 02:10:30 +0900 Subject: [PATCH 3/5] Remove unrelated changes --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index ec726c823912..2bb06f5e045b 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1342,7 +1342,6 @@ test_that("column functions", { df <- read.json(mapTypeJsonPath) j <- collect(select(df, alias(to_json(df$info), "json"))) expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}") - df <- as.DataFrame(j) schema <- structType(structField("age", "integer"), structField("height", "double")) @@ -1356,8 +1355,8 @@ test_that("column functions", { df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}"))) schema2 <- structType(structField("date", "date")) expect_error(tryCatch(collect(select(df, from_json(df$col, schema2))), - error = function(e) { stop(e) }), - paste0(".*(java.lang.NumberFormatException: For input string:).*")) + error = function(e) { stop(e) }), + paste0(".*(java.lang.NumberFormatException: For input string:).*")) s <- collect(select(df, from_json(df$col, schema2, dateFormat = "dd/MM/yyyy"))) expect_is(s[[1]][[1]]$date, "Date") expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21") From d01f952fcfd564cbf0643f5f4592cd8b4d52f4d7 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 14 Mar 2017 19:27:18 +0900 Subject: [PATCH 4/5] Clean up and name it asJsonArray --- R/pkg/R/functions.R | 8 ++++---- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- .../main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 4 ---- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 147d6a8a8c77..328ed457f5ea 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2437,7 +2437,7 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' #' @param x Column containing the JSON string. #' @param schema a structType object to use as the schema to use when parsing the JSON string. -#' @param asArray indicating if input string is JSON array or object. +#' @param asJsonArray indicating if input string is JSON array or object. #' @param ... additional named properties to control how the json is parsed, accepts the same #' options as the JSON data source. #' @@ -2453,9 +2453,9 @@ setMethod("date_format", signature(y = "Column", x = "character"), #'} #' @note from_json since 2.2.0 setMethod("from_json", signature(x = "Column", schema = "structType"), - function(x, schema, asArray = FALSE, ...) { - if (asArray) { - jschema <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", + function(x, schema, asJsonArray = FALSE, ...) { + if (asJsonArray) { + jschema <- callJStatic("org.apache.spark.sql.types.DataTypes", "createArrayType", schema$jobj) } else { diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 2bb06f5e045b..ff76afebdefa 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1369,7 +1369,7 @@ test_that("column functions", { jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]" df <- as.DataFrame(list(list("people" = jsonArr))) schema <- structType(structField("name", "string")) - arr <- collect(select(df, alias(from_json(df$people, schema, asArray = TRUE), "arrcol"))) + arr <- collect(select(df, alias(from_json(df$people, schema, asJsonArray = TRUE), "arrcol"))) expect_equal(ncol(arr), 1) expect_equal(nrow(arr), 1) expect_is(arr[[1]][[1]], "list") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index 2fe8f169ccd8..c77328690dae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -85,10 +85,6 @@ private[sql] object SQLUtils extends Logging { StructType(fields) } - def createArrayType(dataType: DataType): ArrayType = { - ArrayType(elementType = dataType) - } - // Support using regex in string interpolation private[this] implicit class RegexContext(sc: StringContext) { def r: Regex = new Regex(sc.parts.mkString, sc.parts.tail.map(_ => "x"): _*) From 8ec7cd07e80b200397c96b2e8dff98267d118f1b Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 15 Mar 2017 07:07:24 +0900 Subject: [PATCH 5/5] Minor comments --- R/pkg/R/functions.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 328ed457f5ea..9867f2d5b7c5 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2437,7 +2437,7 @@ setMethod("date_format", signature(y = "Column", x = "character"), #' #' @param x Column containing the JSON string. #' @param schema a structType object to use as the schema to use when parsing the JSON string. -#' @param asJsonArray indicating if input string is JSON array or object. +#' @param asJsonArray indicating if input string is JSON array of objects or a single object. #' @param ... additional named properties to control how the json is parsed, accepts the same #' options as the JSON data source. #'