Skip to content

Commit 5a693b4

Browse files
felixcheungFelix Cheung
authored andcommitted
[SPARK-20195][SPARKR][SQL] add createTable catalog API and deprecate createExternalTable
## What changes were proposed in this pull request? Following up on apache#17483, add createTable (which is new in 2.2.0) and deprecate createExternalTable, plus a number of minor fixes ## How was this patch tested? manual, unit tests Author: Felix Cheung <[email protected]> Closes apache#17511 from felixcheung/rceatetable.
1 parent bccc330 commit 5a693b4

4 files changed

Lines changed: 68 additions & 16 deletions

File tree

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ export("as.DataFrame",
361361
"clearCache",
362362
"createDataFrame",
363363
"createExternalTable",
364+
"createTable",
364365
"currentDatabase",
365366
"dropTempTable",
366367
"dropTempView",

R/pkg/R/DataFrame.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ setMethod("insertInto",
557557
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
558558
write <- callJMethod(x@sdf, "write")
559559
write <- callJMethod(write, "mode", jmode)
560-
callJMethod(write, "insertInto", tableName)
560+
invisible(callJMethod(write, "insertInto", tableName))
561561
})
562562

563563
#' Cache
@@ -2894,7 +2894,7 @@ setMethod("saveAsTable",
28942894
write <- callJMethod(write, "format", source)
28952895
write <- callJMethod(write, "mode", jmode)
28962896
write <- callJMethod(write, "options", options)
2897-
callJMethod(write, "saveAsTable", tableName)
2897+
invisible(callJMethod(write, "saveAsTable", tableName))
28982898
})
28992899

29002900
#' summary

R/pkg/R/catalog.R

Lines changed: 50 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
# catalog.R: SparkSession catalog functions
1919

20-
#' Create an external table
20+
#' (Deprecated) Create an external table
2121
#'
2222
#' Creates an external table based on the dataset in a data source,
2323
#' Returns a SparkDataFrame associated with the external table.
@@ -29,10 +29,11 @@
2929
#' @param tableName a name of the table.
3030
#' @param path the path of files to load.
3131
#' @param source the name of external data source.
32-
#' @param schema the schema of the data for certain data source.
32+
#' @param schema the schema of the data required for some data sources.
3333
#' @param ... additional argument(s) passed to the method.
3434
#' @return A SparkDataFrame.
35-
#' @rdname createExternalTable
35+
#' @rdname createExternalTable-deprecated
36+
#' @seealso \link{createTable}
3637
#' @export
3738
#' @examples
3839
#'\dontrun{
@@ -43,24 +44,64 @@
4344
#' @method createExternalTable default
4445
#' @note createExternalTable since 1.4.0
4546
createExternalTable.default <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
47+
.Deprecated("createTable", old = "createExternalTable")
48+
createTable(tableName, path, source, schema, ...)
49+
}
50+
51+
createExternalTable <- function(x, ...) {
52+
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
53+
}
54+
55+
#' Creates a table based on the dataset in a data source
56+
#'
57+
#' Creates a table based on the dataset in a data source. Returns a SparkDataFrame associated with
58+
#' the table.
59+
#'
60+
#' The data source is specified by the \code{source} and a set of options(...).
61+
#' If \code{source} is not specified, the default data source configured by
62+
#' "spark.sql.sources.default" will be used. When a \code{path} is specified, an external table is
63+
#' created from the data at the given path. Otherwise a managed table is created.
64+
#'
65+
#' @param tableName the qualified or unqualified name that designates a table. If no database
66+
#' identifier is provided, it refers to a table in the current database.
67+
#' @param path (optional) the path of files to load.
68+
#' @param source (optional) the name of the data source.
69+
#' @param schema (optional) the schema of the data required for some data sources.
70+
#' @param ... additional named parameters as options for the data source.
71+
#' @return A SparkDataFrame.
72+
#' @rdname createTable
73+
#' @seealso \link{createExternalTable}
74+
#' @export
75+
#' @examples
76+
#'\dontrun{
77+
#' sparkR.session()
78+
#' df <- createTable("myjson", path="path/to/json", source="json", schema)
79+
#'
80+
#' createTable("people", source = "json", schema = schema)
81+
#' insertInto(df, "people")
82+
#' }
83+
#' @name createTable
84+
#' @note createTable since 2.2.0
85+
createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ...) {
4686
sparkSession <- getSparkSession()
4787
options <- varargsToStrEnv(...)
4888
if (!is.null(path)) {
4989
options[["path"]] <- path
5090
}
91+
if (is.null(source)) {
92+
source <- getDefaultSqlSource()
93+
}
5194
catalog <- callJMethod(sparkSession, "catalog")
5295
if (is.null(schema)) {
53-
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, options)
96+
sdf <- callJMethod(catalog, "createTable", tableName, source, options)
97+
} else if (class(schema) == "structType") {
98+
sdf <- callJMethod(catalog, "createTable", tableName, source, schema$jobj, options)
5499
} else {
55-
sdf <- callJMethod(catalog, "createExternalTable", tableName, source, schema$jobj, options)
100+
stop("schema must be a structType.")
56101
}
57102
dataFrame(sdf)
58103
}
59104

60-
createExternalTable <- function(x, ...) {
61-
dispatchFunc("createExternalTable(tableName, path = NULL, source = NULL, ...)", x, ...)
62-
}
63-
64105
#' Cache Table
65106
#'
66107
#' Caches the specified table in-memory.

R/pkg/inst/tests/testthat/test_sparkSQL.R

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ test_that("create DataFrame from RDD", {
281281
setHiveContext(sc)
282282
sql("CREATE TABLE people (name string, age double, height float)")
283283
df <- read.df(jsonPathNa, "json", schema)
284-
invisible(insertInto(df, "people"))
284+
insertInto(df, "people")
285285
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age,
286286
c(16))
287287
expect_equal(collect(sql("SELECT height from people WHERE name ='Bob'"))$height,
@@ -1268,33 +1268,43 @@ test_that("column calculation", {
12681268

12691269
test_that("test HiveContext", {
12701270
setHiveContext(sc)
1271-
df <- createExternalTable("json", jsonPath, "json")
1271+
1272+
schema <- structType(structField("name", "string"), structField("age", "integer"),
1273+
structField("height", "float"))
1274+
createTable("people", source = "json", schema = schema)
1275+
df <- read.df(jsonPathNa, "json", schema)
1276+
insertInto(df, "people")
1277+
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16))
1278+
sql("DROP TABLE people")
1279+
1280+
df <- createTable("json", jsonPath, "json")
12721281
expect_is(df, "SparkDataFrame")
12731282
expect_equal(count(df), 3)
12741283
df2 <- sql("select * from json")
12751284
expect_is(df2, "SparkDataFrame")
12761285
expect_equal(count(df2), 3)
12771286

12781287
jsonPath2 <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1279-
invisible(saveAsTable(df, "json2", "json", "append", path = jsonPath2))
1288+
saveAsTable(df, "json2", "json", "append", path = jsonPath2)
12801289
df3 <- sql("select * from json2")
12811290
expect_is(df3, "SparkDataFrame")
12821291
expect_equal(count(df3), 3)
12831292
unlink(jsonPath2)
12841293

12851294
hivetestDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1286-
invisible(saveAsTable(df, "hivetestbl", path = hivetestDataPath))
1295+
saveAsTable(df, "hivetestbl", path = hivetestDataPath)
12871296
df4 <- sql("select * from hivetestbl")
12881297
expect_is(df4, "SparkDataFrame")
12891298
expect_equal(count(df4), 3)
12901299
unlink(hivetestDataPath)
12911300

12921301
parquetDataPath <- tempfile(pattern = "sparkr-test", fileext = ".tmp")
1293-
invisible(saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath))
1302+
saveAsTable(df, "parquetest", "parquet", mode = "overwrite", path = parquetDataPath)
12941303
df5 <- sql("select * from parquetest")
12951304
expect_is(df5, "SparkDataFrame")
12961305
expect_equal(count(df5), 3)
12971306
unlink(parquetDataPath)
1307+
12981308
unsetHiveContext()
12991309
})
13001310

0 commit comments

Comments
 (0)