Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
0ef9fe6
Typo in comment
nahoj Jul 28, 2017
b56f79c
[SPARK-20090][PYTHON] Add StructType.fieldNames in PySpark
HyukjinKwon Jul 29, 2017
c143820
[SPARK-21508][DOC] Fix example code provided in Spark Streaming Docum…
Jul 29, 2017
60e9b2b
[SPARK-21357][DSTREAMS] FileInputDStream not remove out of date RDD
shaofei007 Jul 29, 2017
9c8109e
[SPARK-21555][SQL] RuntimeReplaceable should be compared semantically…
viirya Jul 29, 2017
92d8563
[SPARK-19451][SQL] rangeBetween method should accept Long value as bo…
jiangxb1987 Jul 29, 2017
6550086
[SPARK-20962][SQL] Support subquery column aliases in FROM clause
maropu Jul 29, 2017
51f99fb
[SQL] Fix typo in DataframeWriter doc
Jul 30, 2017
d79816d
[SPARK-21297][WEB-UI] Add count in 'JDBC/ODBC Server' page.
Jul 30, 2017
6830e90
[MINOR][DOC] Replace numTasks with numPartitions in programming guide
polarker Jul 30, 2017
f1a798b
[MINOR] Minor comment fixes in merge_spark_pr.py script
HyukjinKwon Jul 31, 2017
44e501a
[SPARK-19839][CORE] release longArray in BytesToBytesMap
Jul 31, 2017
106eaa9
[SPARK-21575][SPARKR] Eliminate needless synchronization in java-R se…
SereneAnt Jul 31, 2017
6b186c9
[SPARK-18950][SQL] Report conflicting fields when merging two StructT…
jiayue-zhang Aug 1, 2017
9570e81
[SPARK-21381][SPARKR] SparkR: pass on setHandleInvalid for classifica…
wangmiao1981 Aug 1, 2017
110695d
[SPARK-21589][SQL][DOC] Add documents about Hive UDF/UDTF/UDAF
maropu Aug 1, 2017
5fd0294
[SPARK-21475][CORE] Use NIO's Files API to replace FileInputStream/Fi…
jerryshao Aug 1, 2017
253a07e
[SPARK-21388][ML][PYSPARK] GBTs inherit from HasStepSize & LInearSVC …
zhengruifeng Aug 1, 2017
97ccc63
[SPARK-21585] Application Master marking application status as Failed…
Aug 1, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions R/pkg/R/mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ setClass("NaiveBayesModel", representation(jobj = "jobj"))
#' @param aggregationDepth The depth for treeAggregate (greater than or equal to 2). If the dimensions of features
#' or the number of partitions are large, this param could be adjusted to a larger size.
#' This is an expert parameter. Default value should be good for most cases.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional arguments passed to the method.
#' @return \code{spark.svmLinear} returns a fitted linear SVM model.
#' @rdname spark.svmLinear
Expand Down Expand Up @@ -98,7 +103,8 @@ setClass("NaiveBayesModel", representation(jobj = "jobj"))
#' @note spark.svmLinear since 2.2.0
setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, regParam = 0.0, maxIter = 100, tol = 1E-6, standardization = TRUE,
threshold = 0.0, weightCol = NULL, aggregationDepth = 2) {
threshold = 0.0, weightCol = NULL, aggregationDepth = 2,
handleInvalid = c("error", "keep", "skip")) {
formula <- paste(deparse(formula), collapse = "")

if (!is.null(weightCol) && weightCol == "") {
Expand All @@ -107,10 +113,12 @@ setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formu
weightCol <- as.character(weightCol)
}

handleInvalid <- match.arg(handleInvalid)

jobj <- callJStatic("org.apache.spark.ml.r.LinearSVCWrapper", "fit",
data@sdf, formula, as.numeric(regParam), as.integer(maxIter),
as.numeric(tol), as.logical(standardization), as.numeric(threshold),
weightCol, as.integer(aggregationDepth))
weightCol, as.integer(aggregationDepth), handleInvalid)
new("LinearSVCModel", jobj = jobj)
})

Expand Down Expand Up @@ -218,6 +226,11 @@ function(object, path, overwrite = FALSE) {
#' @param upperBoundsOnIntercepts The upper bounds on intercepts if fitting under bound constrained optimization.
#' The bound vector size must be equal to 1 for binomial regression, or the number
#' of classes for multinomial regression.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional arguments passed to the method.
#' @return \code{spark.logit} returns a fitted logistic regression model.
#' @rdname spark.logit
Expand Down Expand Up @@ -257,7 +270,8 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
tol = 1E-6, family = "auto", standardization = TRUE,
thresholds = 0.5, weightCol = NULL, aggregationDepth = 2,
lowerBoundsOnCoefficients = NULL, upperBoundsOnCoefficients = NULL,
lowerBoundsOnIntercepts = NULL, upperBoundsOnIntercepts = NULL) {
lowerBoundsOnIntercepts = NULL, upperBoundsOnIntercepts = NULL,
handleInvalid = c("error", "keep", "skip")) {
formula <- paste(deparse(formula), collapse = "")
row <- 0
col <- 0
Expand Down Expand Up @@ -304,6 +318,8 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
upperBoundsOnCoefficients <- as.array(as.vector(upperBoundsOnCoefficients))
}

handleInvalid <- match.arg(handleInvalid)

jobj <- callJStatic("org.apache.spark.ml.r.LogisticRegressionWrapper", "fit",
data@sdf, formula, as.numeric(regParam),
as.numeric(elasticNetParam), as.integer(maxIter),
Expand All @@ -312,7 +328,8 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", formula = "formula")
weightCol, as.integer(aggregationDepth),
as.integer(row), as.integer(col),
lowerBoundsOnCoefficients, upperBoundsOnCoefficients,
lowerBoundsOnIntercepts, upperBoundsOnIntercepts)
lowerBoundsOnIntercepts, upperBoundsOnIntercepts,
handleInvalid)
new("LogisticRegressionModel", jobj = jobj)
})

Expand Down Expand Up @@ -394,7 +411,12 @@ setMethod("write.ml", signature(object = "LogisticRegressionModel", path = "char
#' @param stepSize stepSize parameter.
#' @param seed seed parameter for weights initialization.
#' @param initialWeights initialWeights parameter for weights initialization, it should be a
#' numeric vector.
#' numeric vector.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional arguments passed to the method.
#' @return \code{spark.mlp} returns a fitted Multilayer Perceptron Classification Model.
#' @rdname spark.mlp
Expand Down Expand Up @@ -426,7 +448,8 @@ setMethod("write.ml", signature(object = "LogisticRegressionModel", path = "char
#' @note spark.mlp since 2.1.0
setMethod("spark.mlp", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, layers, blockSize = 128, solver = "l-bfgs", maxIter = 100,
tol = 1E-6, stepSize = 0.03, seed = NULL, initialWeights = NULL) {
tol = 1E-6, stepSize = 0.03, seed = NULL, initialWeights = NULL,
handleInvalid = c("error", "keep", "skip")) {
formula <- paste(deparse(formula), collapse = "")
if (is.null(layers)) {
stop ("layers must be a integer vector with length > 1.")
Expand All @@ -441,10 +464,11 @@ setMethod("spark.mlp", signature(data = "SparkDataFrame", formula = "formula"),
if (!is.null(initialWeights)) {
initialWeights <- as.array(as.numeric(na.omit(initialWeights)))
}
handleInvalid <- match.arg(handleInvalid)
jobj <- callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper",
"fit", data@sdf, formula, as.integer(blockSize), as.array(layers),
as.character(solver), as.integer(maxIter), as.numeric(tol),
as.numeric(stepSize), seed, initialWeights)
as.numeric(stepSize), seed, initialWeights, handleInvalid)
new("MultilayerPerceptronClassificationModel", jobj = jobj)
})

Expand Down Expand Up @@ -514,6 +538,11 @@ setMethod("write.ml", signature(object = "MultilayerPerceptronClassificationMode
#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' @param smoothing smoothing parameter.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional argument(s) passed to the method. Currently only \code{smoothing}.
#' @return \code{spark.naiveBayes} returns a fitted naive Bayes model.
#' @rdname spark.naiveBayes
Expand Down Expand Up @@ -543,10 +572,12 @@ setMethod("write.ml", signature(object = "MultilayerPerceptronClassificationMode
#' }
#' @note spark.naiveBayes since 2.0.0
setMethod("spark.naiveBayes", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, smoothing = 1.0) {
function(data, formula, smoothing = 1.0,
handleInvalid = c("error", "keep", "skip")) {
formula <- paste(deparse(formula), collapse = "")
handleInvalid <- match.arg(handleInvalid)
jobj <- callJStatic("org.apache.spark.ml.r.NaiveBayesWrapper", "fit",
formula, data@sdf, smoothing)
formula, data@sdf, smoothing, handleInvalid)
new("NaiveBayesModel", jobj = jobj)
})

Expand Down
33 changes: 25 additions & 8 deletions R/pkg/R/mllib_tree.R
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ print.summary.decisionTree <- function(x) {
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
#' cache be checkpointed or disable it by setting checkpointInterval.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type in classification model.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional arguments passed to the method.
#' @aliases spark.gbt,SparkDataFrame,formula-method
#' @return \code{spark.gbt} returns a fitted Gradient Boosted Tree model.
Expand Down Expand Up @@ -205,7 +210,8 @@ setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, type = c("regression", "classification"),
maxDepth = 5, maxBins = 32, maxIter = 20, stepSize = 0.1, lossType = NULL,
seed = NULL, subsamplingRate = 1.0, minInstancesPerNode = 1, minInfoGain = 0.0,
checkpointInterval = 10, maxMemoryInMB = 256, cacheNodeIds = FALSE) {
checkpointInterval = 10, maxMemoryInMB = 256, cacheNodeIds = FALSE,
handleInvalid = c("error", "keep", "skip")) {
type <- match.arg(type)
formula <- paste(deparse(formula), collapse = "")
if (!is.null(seed)) {
Expand All @@ -225,6 +231,7 @@ setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"),
new("GBTRegressionModel", jobj = jobj)
},
classification = {
handleInvalid <- match.arg(handleInvalid)
if (is.null(lossType)) lossType <- "logistic"
lossType <- match.arg(lossType, "logistic")
jobj <- callJStatic("org.apache.spark.ml.r.GBTClassifierWrapper",
Expand All @@ -233,7 +240,8 @@ setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"),
as.numeric(stepSize), as.integer(minInstancesPerNode),
as.numeric(minInfoGain), as.integer(checkpointInterval),
lossType, seed, as.numeric(subsamplingRate),
as.integer(maxMemoryInMB), as.logical(cacheNodeIds))
as.integer(maxMemoryInMB), as.logical(cacheNodeIds),
handleInvalid)
new("GBTClassificationModel", jobj = jobj)
}
)
Expand Down Expand Up @@ -374,10 +382,11 @@ setMethod("write.ml", signature(object = "GBTClassificationModel", path = "chara
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
#' cache be checkpointed or disable it by setting checkpointInterval.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in classification model.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type in classification model.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional arguments passed to the method.
#' @aliases spark.randomForest,SparkDataFrame,formula-method
#' @return \code{spark.randomForest} returns a fitted Random Forest model.
Expand Down Expand Up @@ -583,6 +592,11 @@ setMethod("write.ml", signature(object = "RandomForestClassificationModel", path
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
#' can speed up training of deeper trees. Users can set how often should the
#' cache be checkpointed or disable it by setting checkpointInterval.
#' @param handleInvalid How to handle invalid data (unseen labels or NULL values) in features and label
#' column of string type in classification model.
#' Supported options: "skip" (filter out rows with invalid data),
#' "error" (throw an error), "keep" (put invalid data in a special additional
#' bucket, at index numLabels). Default is "error".
#' @param ... additional arguments passed to the method.
#' @aliases spark.decisionTree,SparkDataFrame,formula-method
#' @return \code{spark.decisionTree} returns a fitted Decision Tree model.
Expand Down Expand Up @@ -617,7 +631,8 @@ setMethod("spark.decisionTree", signature(data = "SparkDataFrame", formula = "fo
function(data, formula, type = c("regression", "classification"),
maxDepth = 5, maxBins = 32, impurity = NULL, seed = NULL,
minInstancesPerNode = 1, minInfoGain = 0.0, checkpointInterval = 10,
maxMemoryInMB = 256, cacheNodeIds = FALSE) {
maxMemoryInMB = 256, cacheNodeIds = FALSE,
handleInvalid = c("error", "keep", "skip")) {
type <- match.arg(type)
formula <- paste(deparse(formula), collapse = "")
if (!is.null(seed)) {
Expand All @@ -636,14 +651,16 @@ setMethod("spark.decisionTree", signature(data = "SparkDataFrame", formula = "fo
new("DecisionTreeRegressionModel", jobj = jobj)
},
classification = {
handleInvalid <- match.arg(handleInvalid)
if (is.null(impurity)) impurity <- "gini"
impurity <- match.arg(impurity, c("gini", "entropy"))
jobj <- callJStatic("org.apache.spark.ml.r.DecisionTreeClassifierWrapper",
"fit", data@sdf, formula, as.integer(maxDepth),
as.integer(maxBins), impurity,
as.integer(minInstancesPerNode), as.numeric(minInfoGain),
as.integer(checkpointInterval), seed,
as.integer(maxMemoryInMB), as.logical(cacheNodeIds))
as.integer(maxMemoryInMB), as.logical(cacheNodeIds),
handleInvalid)
new("DecisionTreeClassificationModel", jobj = jobj)
}
)
Expand Down
Loading