Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ export("print.jobj")

exportClasses("DataFrame")

exportMethods("cache",
exportMethods("arrange",
"cache",
"collect",
"columns",
"count",
Expand All @@ -20,6 +21,7 @@ exportMethods("cache",
"explain",
"filter",
"first",
"group_by",
"groupBy",
"head",
"insertInto",
Expand All @@ -29,12 +31,15 @@ exportMethods("cache",
"length",
"limit",
"orderBy",
"mutate",
"names",
"persist",
"printSchema",
"registerTempTable",
"rename",
"repartition",
"sampleDF",
"sample_frac",
"saveAsParquetFile",
"saveAsTable",
"saveDF",
Expand All @@ -43,7 +48,7 @@ exportMethods("cache",
"selectExpr",
"show",
"showDF",
"sortDF",
"summarize",
"take",
"unionAll",
"unpersist",
Expand Down Expand Up @@ -73,6 +78,8 @@ exportMethods("abs",
"max",
"mean",
"min",
"n",
"n_distinct",
"rlike",
"sqrt",
"startsWith",
Expand Down
128 changes: 116 additions & 12 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ setMethod("distinct",
#' @param withReplacement Sampling with replacement or not
#' @param fraction The (rough) sample target fraction
#' @rdname sampleDF
#' @alias sample_frac
#' @export
#' @examples
#'\dontrun{
Expand All @@ -498,6 +499,15 @@ setMethod("sampleDF",
dataFrame(sdf)
})

#' @rdname sampleDF
#' @alias sampleDF
setMethod("sample_frac",
signature(x = "DataFrame", withReplacement = "logical",
fraction = "numeric"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why wrap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly because at one point we were using 80 char line limit for SparkR. The style isnt' fully changed to 100 char. We can do a full cleanup as a part of https://issues.apache.org/jira/browse/SPARK-6813

function(x, withReplacement, fraction) {
sampleDF(x, withReplacement, fraction)
})

#' Count
#'
#' Returns the number of rows in a DataFrame
Expand Down Expand Up @@ -679,7 +689,8 @@ setMethod("toRDD",
#' @param x a DataFrame
#' @return a GroupedData
#' @seealso GroupedData
#' @rdname DataFrame
#' @alias group_by
#' @rdname groupBy
#' @export
#' @examples
#' \dontrun{
Expand All @@ -702,18 +713,35 @@ setMethod("groupBy",
groupedData(sgd)
})

#' Agg
#' @rdname groupBy
#' @aliases group_by
setMethod("group_by",
signature(x = "DataFrame"),
function(x, ...) {
groupBy(x, ...)
})

#' Summarize data across columns
#'
#' Compute aggregates by specifying a list of columns
#'
#' @rdname DataFrame
#' @alias summarize
#' @export
setMethod("agg",
signature(x = "DataFrame"),
function(x, ...) {
agg(groupBy(x), ...)
})

#' @rdname DataFrame
#' @alias agg
setMethod("summarize",
signature(x = "DataFrame"),
function(x, ...) {
agg(x, ...)
})


############################## RDD Map Functions ##################################
# All of the following functions mirror the existing RDD map functions, #
Expand Down Expand Up @@ -881,7 +909,7 @@ setMethod("select",
signature(x = "DataFrame", col = "list"),
function(x, col) {
cols <- lapply(col, function(c) {
if (class(c)== "Column") {
if (class(c) == "Column") {
c@jc
} else {
col(c)@jc
Expand Down Expand Up @@ -941,6 +969,42 @@ setMethod("withColumn",
select(x, x$"*", alias(col, colName))
})

#' Mutate
#'
#' Return a new DataFrame with the specified columns added.
#'
#' @param x A DataFrame
#' @param col a named argument of the form name = col
#' @return A new DataFrame with the new columns added.
#' @rdname withColumn
#' @alias withColumn
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' newDF <- mutate(df, newCol = df$col1 * 5, newCol2 = df$col1 * 2)
#' names(newDF) # Will contain newCol, newCol2
#' }
setMethod("mutate",
signature(x = "DataFrame"),
function(x, ...) {
cols <- list(...)
stopifnot(length(cols) > 0)
stopifnot(class(cols[[1]]) == "Column")
ns <- names(cols)
if (!is.null(ns)) {
for (n in ns) {
if (n != "") {
cols[[n]] <- alias(cols[[n]], n)
}
}
}
do.call(select, c(x, x$"*", cols))
})

#' WithColumnRenamed
#'
#' Rename an existing column in a DataFrame.
Expand Down Expand Up @@ -972,29 +1036,67 @@ setMethod("withColumnRenamed",
select(x, cols)
})

#' Rename
#'
#' Rename an existing column in a DataFrame.
#'
#' @param x A DataFrame
#' @param newCol A named pair of the form new_column_name = existing_column
#' @return A DataFrame with the column name changed.
#' @rdname withColumnRenamed
#' @alias withColumnRenamed
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' newDF <- rename(df, col1 = df$newCol1)
#' }
setMethod("rename",
signature(x = "DataFrame"),
function(x, ...) {
renameCols <- list(...)
stopifnot(length(renameCols) > 0)
stopifnot(class(renameCols[[1]]) == "Column")
newNames <- names(renameCols)
oldNames <- lapply(renameCols, function(col) {
callJMethod(col@jc, "toString")
})
cols <- lapply(columns(x), function(c) {
if (c %in% oldNames) {
alias(col(c), newNames[[match(c, oldNames)]])
} else {
col(c)
}
})
select(x, cols)
})

setClassUnion("characterOrColumn", c("character", "Column"))

#' SortDF
#' Arrange
#'
#' Sort a DataFrame by the specified column(s).
#'
#' @param x A DataFrame to be sorted.
#' @param col Either a Column object or character vector indicating the field to sort on
#' @param ... Additional sorting fields
#' @return A DataFrame where all elements are sorted.
#' @rdname sortDF
#' @rdname arrange
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' sortDF(df, df$col1)
#' sortDF(df, "col1")
#' sortDF(df, asc(df$col1), desc(abs(df$col2)))
#' arrange(df, df$col1)
#' arrange(df, "col1")
#' arrange(df, asc(df$col1), desc(abs(df$col2)))
#' }
setMethod("sortDF",
setMethod("arrange",
signature(x = "DataFrame", col = "characterOrColumn"),
function(x, col, ...) {
if (class(col) == "character") {
Expand All @@ -1008,20 +1110,21 @@ setMethod("sortDF",
dataFrame(sdf)
})

#' @rdname sortDF
#' @rdname arrange
#' @aliases orderBy,DataFrame,function-method
#' @export
setMethod("orderBy",
signature(x = "DataFrame", col = "characterOrColumn"),
function(x, col) {
sortDF(x, col)
arrange(x, col)
})

#' Filter
#'
#' Filter the rows of a DataFrame according to a given condition.
#'
#' @param x A DataFrame to be sorted.
#' @param condition The condition to sort on. This may either be a Column expression
#' @param condition The condition to filter on. This may either be a Column expression
#' or a string containing a SQL statement
#' @return A DataFrame containing only the rows that meet the condition.
#' @rdname filter
Expand Down Expand Up @@ -1101,6 +1204,7 @@ setMethod("join",
#'
#' Return a new DataFrame containing the union of rows in this DataFrame
#' and another DataFrame. This is equivalent to `UNION ALL` in SQL.
#' Note that this does not remove duplicate rows across the two DataFrames.
#'
#' @param x A Spark DataFrame
#' @param y A Spark DataFrame
Expand Down
28 changes: 26 additions & 2 deletions R/pkg/R/column.R
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ createMethods()

#' alias
#'
#' @rdname column
#'
#' Set a new name for a column
setMethod("alias",
signature(object = "Column"),
Expand All @@ -143,6 +145,8 @@ setMethod("alias",

#' An expression that returns a substring.
#'
#' @rdname column
#'
#' @param start starting position
#' @param stop ending position
setMethod("substr", signature(x = "Column"),
Expand All @@ -152,6 +156,9 @@ setMethod("substr", signature(x = "Column"),
})

#' Casts the column to a different data type.
#'
#' @rdname column
#'
#' @examples
#' \dontrun{
#' cast(df$age, "string")
Expand All @@ -173,8 +180,9 @@ setMethod("cast",

#' Approx Count Distinct
#'
#' Returns the approximate number of distinct items in a group.
#' @rdname column
#'
#' Returns the approximate number of distinct items in a group.
setMethod("approxCountDistinct",
signature(x = "Column"),
function(x, rsd = 0.95) {
Expand All @@ -184,8 +192,9 @@ setMethod("approxCountDistinct",

#' Count Distinct
#'
#' returns the number of distinct items in a group.
#' @rdname column
#'
#' returns the number of distinct items in a group.
setMethod("countDistinct",
signature(x = "Column"),
function(x, ...) {
Expand All @@ -197,3 +206,18 @@ setMethod("countDistinct",
column(jc)
})

#' @rdname column
#' @alias countDistinct
setMethod("n_distinct",
signature(x = "Column"),
function(x, ...) {
countDistinct(x, ...)
})

#' @rdname column
#' @alias count
setMethod("n",
signature(x = "Column"),
function(x) {
count(x)
})
Loading