Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.classification

import breeze.linalg.{argmax => Bargmax}

import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.{PredictionModel, Predictor}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.ml.regression.MultilayerPerceptronParams
import org.apache.spark.mllib.ann.{FeedForwardTrainer, FeedForwardTopology}
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.DataFrame

/**
* :: Experimental ::
* Label to vector converter.
*/
@Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove annotation on private classes and methods.

This could be replaced by OneHotEncoder. We don't need to do it in this PR, but please leave a TODO note.

private object LabelConverter {

/**
* Encodes a label as a vector.
* Returns a vector of given length with zeroes at all positions
* and value 1.0 at the position that corresponds to the label.
*
* @param labeledPoint labeled point
* @param labelCount total number of labels
* @return vector encoding of a label
*/
def apply(labeledPoint: LabeledPoint, labelCount: Int): (Vector, Vector) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes the code harder to read with apply because this is not a factory method that creates LabelConverter instances. We should rename it to encodeLabel, for example.

val output = Array.fill(labelCount){0.0}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{0.0} -> (0.0)

output(labeledPoint.label.toInt) = 1.0
(labeledPoint.features, Vectors.dense(output))
}

/**
* Converts a vector to a label.
* Returns the position of the maximal element of a vector.
*
* @param output label encoded with a vector
* @return label
*/
def apply(output: Vector): Double = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, this one could be renamed to decodeLabel.

Bargmax(output.toBreeze.toDenseVector).toDouble
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

output.argmax should work. We recently merged Vector.argmax.

}
}

/**
* :: Experimental ::
* Classifier trainer based on the Multilayer Perceptron.
* Each layer has sigmoid activation function, output layer has softmax.
* Number of inputs has to be equal to the size of feature vectors.
* Number of outputs has to be equal to the total number of labels.
*
*/
@Experimental
class MultilayerPerceptronClassifier (override val uid: String)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove space before (

extends Predictor[Vector, MultilayerPerceptronClassifier, MultilayerPerceptronClassifierModel]
with MultilayerPerceptronParams {

override def copy(extra: ParamMap): MultilayerPerceptronClassifier = defaultCopy(extra)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move copy down (at least after this()) because constructors should appear at the top


def this() = this(Identifiable.randomUID("mlpc"))

/**
* Train a model using the given dataset and parameters.
* Developers can implement this instead of [[fit()]] to avoid dealing with schema validation
* and copying parameters into the model.
*
* @param dataset Training dataset
* @return Fitted model
*/
override protected def train(dataset: DataFrame): MultilayerPerceptronClassifierModel = {
val labels = getLayers.last.toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove .toInt and rename labels to numLabels. We can use $ to get a param value, e.g., val numLabels = $(layer).last.

val lpData = extractLabeledPoints(dataset)
val data = lpData.map(lp => LabelConverter(lp, labels))
val myLayers = getLayers.map(_.toInt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove .map(_.toInt). If we need to create a copy, use $(layer).clone()

val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, true)
val FeedForwardTrainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)
FeedForwardTrainer.LBFGSOptimizer.setConvergenceTol(getTol).setNumIterations(getMaxIter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use $(layer), let's also use $(tol) and $(maxIter). Please also update other getter usage.

FeedForwardTrainer.setStackSize(getBlockSize)
val mlpModel = FeedForwardTrainer.train(data)
new MultilayerPerceptronClassifierModel(uid, myLayers, mlpModel.weights())
}
}

/**
* :: Experimental ::
* Classifier model based on the Multilayer Perceptron.
* Each layer has sigmoid activation function, output layer has softmax.
*/
@Experimental
class MultilayerPerceptronClassifierModel private[ml] (override val uid: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chop down the arguments and use 4-space indentation

need JavaDoc for layers and weights. Though they are private, it helps people understand the code.

layers: Array[Int],
weights: Vector)
extends PredictionModel[Vector, MultilayerPerceptronClassifierModel]
with Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support model save/load?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if there exist a generic model loader/saver in Spark ML? I can think only about using sc.parallelize(Seq(model), 1).saveAsObjectFile("model") that does not look good, honestly speaking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do that in a follow-up PR to keep this PR minimal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I see.


private val mlpModel = FeedForwardTopology.multiLayerPerceptron(layers, true).getInstance(weights)

/**
* Predict label for the given features.
* This internal method is used to implement [[transform()]] and output [[predictionCol]].
*/
override protected def predict(features: Vector): Double = {
LabelConverter(mlpModel.predict(features))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why I recommend renaming apply under LabelConverter. It is confusing to see LabelConverter(...) returns a Double.

}

override def copy(extra: ParamMap): MultilayerPerceptronClassifierModel = {
copyValues(new MultilayerPerceptronClassifierModel(uid, layers, weights), extra)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.regression

import breeze.linalg.{argmax => Bargmax}

import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
import org.apache.spark.ml.{Model, Transformer, Estimator, PredictorParams}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared._
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.mllib.ann.{FeedForwardTopology, FeedForwardTrainer}
import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.sql.{Row, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}

/**
* Params for Multilayer Perceptron.
*/
private[ml] trait MultilayerPerceptronParams extends PredictorParams
with HasSeed with HasMaxIter with HasTol {
/**
* Layer sizes including input size and output size.
* @group param
*/
final val layers: IntArrayParam =
// TODO: we need IntegerArrayParam!
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still valid? Move line 45 to 43.

new IntArrayParam(this, "layers",
"Sizes of layers including input and output from bottom to the top." +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure whether bottom and top are standard terms. Sizes of layers from input layer to output layer looks sufficient to me.

" E.g., Array(780, 100, 10) means 780 inputs, " +
"hidden layer with 100 neurons and output layer of 10 neurons."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hidden layer -> one hidden layer

// TODO: how to check that array is not empty?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add one to ParamValidators. It should contain at least two layers, right?

)

/**
* Block size for stacking input data in matrices. Speeds up the computations.
* Cannot be more than the size of the dataset.
* @group expertParam
*/
final val blockSize: IntParam = new IntParam(this, "blockSize",
"Block size for stacking input data in matrices.",
ParamValidators.gt(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't chop down args in method calls. So we can merge this line to the one above.


/** @group setParam */
def setLayers(value: Array[Int]): this.type = set(layers, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move setter and getter to val layers so they form a group


/** @group getParam */
final def getLayers: Array[Int] = $(layers)

/** @group setParam */
def setBlockSize(value: Int): this.type = set(blockSize, value)

/** @group getParam */
final def getBlockSize: Int = $(blockSize)

/**
* Set the maximum number of iterations.
* Default is 100.
* @group setParam
*/
def setMaxIter(value: Int): this.type = set(maxIter, value)

/**
* Set the convergence tolerance of iterations.
* Smaller value will lead to higher accuracy with the cost of more iterations.
* Default is 1E-4.
* @group setParam
*/
def setTol(value: Double): this.type = set(tol, value)

/**
* Set the seed for weights initialization.
* Default is 11L.
* @group setParam
*/
def setSeed(value: Long): this.type = set(seed, value)

setDefault(seed -> 11L, maxIter -> 100, tol -> 1e-4, layers -> Array(1, 1), blockSize -> 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seed has a default value inherited from hasSeed.

}

/**
* :: Experimental ::
* Multi-layer perceptron regression. Contains sigmoid activation function on all layers.
* See https://en.wikipedia.org/wiki/Multilayer_perceptron for details.
*
*/
@Experimental
class MultilayerPerceptronRegressor (override val uid: String)
extends Estimator[MultilayerPerceptronRegressorModel]
with MultilayerPerceptronParams with HasInputCol with HasOutputCol with HasRawPredictionCol
with Logging {

/** @group setParam */
def setInputCol(value: String): this.type = set(inputCol, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to use inputCol and outputCol instead of featuresCol and labelCol?


/** @group setParam */
def setOutputCol(value: String): this.type = set(outputCol, value)

/**
* Fits a model to the input and output data.
* InputCol has to contain input vectors.
* OutputCol has to contain output vectors.
*/
override def fit(dataset: DataFrame): MultilayerPerceptronRegressorModel = {
val data = dataset.select($(inputCol), $(outputCol)).map {
case Row(x: Vector, y: Vector) => (x, y)
}
val myLayers = getLayers
val topology = FeedForwardTopology.multiLayerPerceptron(myLayers, false)
val FeedForwardTrainer = new FeedForwardTrainer(topology, myLayers(0), myLayers.last)
FeedForwardTrainer.LBFGSOptimizer.setConvergenceTol(getTol).setNumIterations(getMaxIter)
FeedForwardTrainer.setStackSize(getBlockSize)
val mlpModel = FeedForwardTrainer.train(data)
new MultilayerPerceptronRegressorModel(uid, myLayers, mlpModel.weights())
}

/**
* :: DeveloperApi ::
*
* Derives the output schema from the input schema.
*/
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
require(inputType.isInstanceOf[VectorUDT],
s"Input column ${$(inputCol)} must be a vector column")
val outputType = schema($(outputCol)).dataType
require(outputType.isInstanceOf[VectorUDT],
s"Input column ${$(outputCol)} must be a vector column")
require(!schema.fieldNames.contains($(rawPredictionCol)),
s"Output column ${$(rawPredictionCol)} already exists.")
val outputFields = schema.fields :+ StructField($(rawPredictionCol), new VectorUDT, false)
StructType(outputFields)
}

def this() = this(Identifiable.randomUID("mlpr"))

override def copy(extra: ParamMap): MultilayerPerceptronRegressor = defaultCopy(extra)
}

/**
* :: Experimental ::
* Multi-layer perceptron regression model.
*
* @param layers array of layer sizes including input and output
* @param weights weights (or parameters) of the model
*/
@Experimental
class MultilayerPerceptronRegressorModel private[ml] (override val uid: String,
layers: Array[Int],
weights: Vector)
extends Model[MultilayerPerceptronRegressorModel]
with HasInputCol with HasRawPredictionCol {

private val mlpModel =
FeedForwardTopology.multiLayerPerceptron(layers, false).getInstance(weights)

/** @group setParam */
def setInputCol(value: String): this.type = set(inputCol, value)

/**
* Transforms the input dataset.
* InputCol has to contain input vectors.
* RawPrediction column will contain predictions (outputs of the regressor).
*/
override def transform(dataset: DataFrame): DataFrame = {
transformSchema(dataset.schema, logging = true)
val pcaOp = udf { mlpModel.predict _ }
dataset.withColumn($(rawPredictionCol), pcaOp(col($(inputCol))))
}

/**
* :: DeveloperApi ::
*
* Derives the output schema from the input schema.
*/
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
require(inputType.isInstanceOf[VectorUDT],
s"Input column ${$(inputCol)} must be a vector column")
require(!schema.fieldNames.contains($(rawPredictionCol)),
s"Output column ${$(rawPredictionCol)} already exists.")
val outputFields = schema.fields :+ StructField($(rawPredictionCol), new VectorUDT, false)
StructType(outputFields)
}

override def copy(extra: ParamMap): MultilayerPerceptronRegressorModel = {
copyValues(new MultilayerPerceptronRegressorModel(uid, layers, weights), extra)
}
}
Loading