-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-9312][ML] Add RawPrediction, numClasses, and numFeatures for OneVsRestModel #21044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
0cfc20a
2a47e2b
0c32fca
ebf4a6c
b3c7fec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,7 @@ import org.apache.spark.SparkContext | |
| import org.apache.spark.annotation.Since | ||
| import org.apache.spark.ml._ | ||
| import org.apache.spark.ml.attribute._ | ||
| import org.apache.spark.ml.linalg.Vector | ||
| import org.apache.spark.ml.linalg.{Vector, Vectors} | ||
| import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} | ||
| import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol} | ||
| import org.apache.spark.ml.util._ | ||
|
|
@@ -55,7 +55,7 @@ private[ml] trait ClassifierTypeTrait { | |
| /** | ||
| * Params for [[OneVsRest]]. | ||
| */ | ||
| private[ml] trait OneVsRestParams extends PredictorParams | ||
| private[ml] trait OneVsRestParams extends ClassifierParams | ||
| with ClassifierTypeTrait with HasWeightCol { | ||
|
|
||
| /** | ||
|
|
@@ -138,6 +138,12 @@ final class OneVsRestModel private[ml] ( | |
| @Since("1.4.0") val models: Array[_ <: ClassificationModel[_, _]]) | ||
| extends Model[OneVsRestModel] with OneVsRestParams with MLWritable { | ||
|
|
||
| @Since("2.4.0") | ||
| val numClasses: Int = models.length | ||
|
|
||
| @Since("2.4.0") | ||
| val numFeatures: Int = models.head.numFeatures | ||
|
|
||
| /** @group setParam */ | ||
| @Since("2.1.0") | ||
| def setFeaturesCol(value: String): this.type = set(featuresCol, value) | ||
|
|
@@ -146,6 +152,10 @@ final class OneVsRestModel private[ml] ( | |
| @Since("2.1.0") | ||
| def setPredictionCol(value: String): this.type = set(predictionCol, value) | ||
|
|
||
| /** @group setParam */ | ||
| @Since("2.4.0") | ||
| def setRawPredictionCol(value: String): this.type = set(rawPredictionCol, value) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You'll need to add this to the Estimator too. |
||
|
|
||
| @Since("1.4.0") | ||
| override def transformSchema(schema: StructType): StructType = { | ||
| validateAndTransformSchema(schema, fitting = false, getClassifier.featuresDataType) | ||
|
|
@@ -181,6 +191,7 @@ final class OneVsRestModel private[ml] ( | |
| val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) => | ||
| predictions + ((index, prediction(1))) | ||
| } | ||
|
|
||
| model.setFeaturesCol($(featuresCol)) | ||
| val transformedDataset = model.transform(df).select(columns: _*) | ||
| val updatedDataset = transformedDataset | ||
|
|
@@ -195,15 +206,32 @@ final class OneVsRestModel private[ml] ( | |
| newDataset.unpersist() | ||
| } | ||
|
|
||
| // output the index of the classifier with highest confidence as prediction | ||
| val labelUDF = udf { (predictions: Map[Int, Double]) => | ||
| predictions.maxBy(_._2)._1.toDouble | ||
| } | ||
| // output the RawPrediction as vector | ||
| if (getRawPredictionCol != "") { | ||
| val rawPredictionUDF = udf { (predictions: Map[Int, Double]) => | ||
| val predArray = Array.fill[Double](numClasses)(0.0) | ||
|
||
| predictions.foreach { case (idx, value) => predArray(idx) = value } | ||
| Vectors.dense(predArray) | ||
| } | ||
|
|
||
| // output the index of the classifier with highest confidence as prediction | ||
| val labelUDF = udf { (predictions: Vector) => predictions.argmax.toDouble } | ||
|
||
|
|
||
| // output label and label metadata as prediction | ||
| aggregatedDataset | ||
| .withColumn($(predictionCol), labelUDF(col(accColName)), labelMetadata) | ||
| .drop(accColName) | ||
| aggregatedDataset | ||
| .withColumn(getRawPredictionCol, rawPredictionUDF(col(accColName))) | ||
| .withColumn(getPredictionCol, labelUDF(col(getRawPredictionCol)), labelMetadata) | ||
| .drop(accColName) | ||
| } | ||
| else { | ||
|
||
| // output the index of the classifier with highest confidence as prediction | ||
| val labelUDF = udf { (predictions: Map[Int, Double]) => | ||
| predictions.maxBy(_._2)._1.toDouble | ||
| } | ||
| // output confidence as rwa prediction, label and label metadata as prediction | ||
|
||
| aggregatedDataset | ||
| .withColumn(getPredictionCol, labelUDF(col(accColName)), labelMetadata) | ||
| .drop(accColName) | ||
| } | ||
| } | ||
|
|
||
| @Since("1.4.1") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a require() statement here which checks that models.nonEmpty is true (to throw an exception upon construction, rather than when numFeatures calls models.head below). Just to be safe...