diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 89e159016daa5..aa81037014451 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -19,6 +19,8 @@ package org.apache.spark.ml.clustering import java.util.Locale +import breeze.linalg.normalize +import breeze.numerics.exp import org.apache.hadoop.fs.Path import org.json4s.DefaultFormats import org.json4s.JsonAST.JObject @@ -27,7 +29,7 @@ import org.json4s.jackson.JsonMethods._ import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.internal.Logging import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{Matrix, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasCheckpointInterval, HasFeaturesCol, HasMaxIter, HasSeed} import org.apache.spark.ml.util._ @@ -35,7 +37,7 @@ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.clustering.{DistributedLDAModel => OldDistributedLDAModel, EMLDAOptimizer => OldEMLDAOptimizer, LDA => OldLDA, LDAModel => OldLDAModel, - LDAOptimizer => OldLDAOptimizer, LocalLDAModel => OldLocalLDAModel, + LDAOptimizer => OldLDAOptimizer, LDAUtils => OldLDAUtils, LocalLDAModel => OldLocalLDAModel, OnlineLDAOptimizer => OldOnlineLDAOptimizer} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.MatrixImplicits._ @@ -43,7 +45,7 @@ import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.functions.{col, monotonically_increasing_id, udf} +import org.apache.spark.sql.functions.{monotonically_increasing_id, udf} import org.apache.spark.sql.types.StructType import org.apache.spark.storage.StorageLevel import org.apache.spark.util.PeriodicCheckpointer @@ -457,14 +459,14 @@ abstract class LDAModel private[ml] ( */ @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - if ($(topicDistributionCol).nonEmpty) { + transformSchema(dataset.schema, logging = true) - // TODO: Make the transformer natively in ml framework to avoid extra conversion. - val transformer = oldLocalModel.getTopicDistributionMethod + if ($(topicDistributionCol).nonEmpty) { + val func = getTopicDistributionMethod + val transformer = udf(func) - val t = udf { (v: Vector) => transformer(OldVectors.fromML(v)).asML } dataset.withColumn($(topicDistributionCol), - t(DatasetUtils.columnToVector(dataset, getFeaturesCol))).toDF() + transformer(DatasetUtils.columnToVector(dataset, getFeaturesCol))) } else { logWarning("LDAModel.transform was called without any output columns. Set an output column" + " such as topicDistributionCol to produce results.") @@ -472,6 +474,41 @@ abstract class LDAModel private[ml] ( } } + /** + * Get a method usable as a UDF for `topicDistributions()` + */ + private def getTopicDistributionMethod: Vector => Vector = { + val expElogbeta = exp(OldLDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t) + val oldModel = oldLocalModel + val docConcentrationBrz = oldModel.docConcentration.asBreeze + val gammaShape = oldModel.gammaShape + val k = oldModel.k + val gammaSeed = oldModel.seed + + vector: Vector => + if (vector.numNonzeros == 0) { + Vectors.zeros(k) + } else { + val (ids: List[Int], cts: Array[Double]) = vector match { + case v: DenseVector => ((0 until v.size).toList, v.values) + case v: SparseVector => (v.indices.toList, v.values) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + + val (gamma, _, _) = OldOnlineLDAOptimizer.variationalTopicInference( + ids, + cts, + expElogbeta, + docConcentrationBrz, + gammaShape, + k, + gammaSeed) + Vectors.dense(normalize(gamma, 1.0).toArray) + } + } + @Since("1.6.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala index 0b989b0d7d253..16073d5fc1b6b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala @@ -19,10 +19,6 @@ package org.apache.spark.ml.feature import java.{util => ju} -import org.json4s.JsonDSL._ -import org.json4s.JValue -import org.json4s.jackson.JsonMethods._ - import org.apache.spark.SparkException import org.apache.spark.annotation.Since import org.apache.spark.ml.Model diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala index dbfb199ccd58f..2a3656c49584e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml._ import org.apache.spark.ml.attribute.{AttributeGroup, _} -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -264,14 +264,25 @@ final class ChiSqSelectorModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - val transformedSchema = transformSchema(dataset.schema, logging = true) - val newField = transformedSchema.last - - // TODO: Make the transformer natively in ml framework to avoid extra conversion. - val transformer: Vector => Vector = v => chiSqSelector.transform(OldVectors.fromML(v)).asML + val outputSchema = transformSchema(dataset.schema, logging = true) + + val newSize = selectedFeatures.length + val func = { vector: Vector => + vector match { + case SparseVector(_, indices, values) => + val (newIndices, newValues) = chiSqSelector.compressSparse(indices, values) + Vectors.sparse(newSize, newIndices, newValues) + case DenseVector(values) => + Vectors.dense(chiSqSelector.compressDense(values)) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + } - val selector = udf(transformer) - dataset.withColumn($(outputCol), selector(col($(featuresCol))), newField.metadata) + val transformer = udf(func) + dataset.withColumn($(outputCol), transformer(col($(featuresCol))), + outputSchema($(outputCol)).metadata) } @Since("1.6.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala index f860b3a787b4d..ece125ba8ac70 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala @@ -19,11 +19,11 @@ package org.apache.spark.ml.feature import org.apache.spark.annotation.Since import org.apache.spark.ml.UnaryTransformer -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param.Param import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} -import org.apache.spark.mllib.feature -import org.apache.spark.mllib.linalg.VectorImplicits._ +import org.apache.spark.mllib.feature.{ElementwiseProduct => OldElementwiseProduct} +import org.apache.spark.mllib.linalg.{Vectors => OldVectors} import org.apache.spark.sql.types.DataType /** @@ -55,8 +55,24 @@ class ElementwiseProduct @Since("1.4.0") (@Since("1.4.0") override val uid: Stri override protected def createTransformFunc: Vector => Vector = { require(params.contains(scalingVec), s"transformation requires a weight vector") - val elemScaler = new feature.ElementwiseProduct($(scalingVec)) - v => elemScaler.transform(v) + val elemScaler = new OldElementwiseProduct(OldVectors.fromML($(scalingVec))) + val vectorSize = $(scalingVec).size + + vector: Vector => { + require(vector.size == vectorSize, + s"vector sizes do not match: Expected $vectorSize but found ${vector.size}") + vector match { + case DenseVector(values) => + val newValues = elemScaler.transformDense(values) + Vectors.dense(newValues) + case SparseVector(size, indices, values) => + val (newIndices, newValues) = elemScaler.transformSparse(indices, values) + Vectors.sparse(size, newIndices, newValues) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + } } override protected def outputDataType: DataType = new VectorUDT() diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala index dbda5b8d8fd4a..4e4a61d8bec65 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala @@ -20,10 +20,11 @@ package org.apache.spark.ml.feature import org.apache.spark.annotation.Since import org.apache.spark.ml.Transformer import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.util._ -import org.apache.spark.mllib.feature +import org.apache.spark.mllib.feature.{HashingTF => OldHashingTF} import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types.{ArrayType, StructType} @@ -93,11 +94,16 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { val outputSchema = transformSchema(dataset.schema) - val hashingTF = new feature.HashingTF($(numFeatures)).setBinary($(binary)) - // TODO: Make the hashingTF.transform natively in ml framework to avoid extra conversion. - val t = udf { terms: Seq[_] => hashingTF.transform(terms).asML } - val metadata = outputSchema($(outputCol)).metadata - dataset.select(col("*"), t(col($(inputCol))).as($(outputCol), metadata)) + + val hashingTF = new OldHashingTF($(numFeatures)).setBinary($(binary)) + val func = (terms: Seq[_]) => { + val seq = hashingTF.transformImpl(terms) + Vectors.sparse(hashingTF.numFeatures, seq) + } + + val transformer = udf(func) + dataset.withColumn($(outputCol), transformer(col($(inputCol))), + outputSchema($(outputCol)).metadata) } @Since("1.4.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 98a9674343b2a..4338421bf8bcf 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml._ -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -132,9 +132,24 @@ class IDFModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - // TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion. - val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML } - dataset.withColumn($(outputCol), idf(col($(inputCol)))) + + val func = { vector: Vector => + vector match { + case SparseVector(size, indices, values) => + val (newIndices, newValues) = feature.IDFModel.transformSparse(idfModel.idf, + indices, values) + Vectors.sparse(size, newIndices, newValues) + case DenseVector(values) => + val newValues = feature.IDFModel.transformDense(idfModel.idf, values) + Vectors.dense(newValues) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + } + + val transformer = udf(func) + dataset.withColumn($(outputCol), transformer(col($(inputCol)))) } @Since("1.4.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 8172491a517d1..aa5a171d4fec5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -26,11 +26,7 @@ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ import org.apache.spark.mllib.feature -import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, DenseVector => OldDenseVector, - Matrices => OldMatrices, Vector => OldVector, Vectors => OldVectors} -import org.apache.spark.mllib.linalg.MatrixImplicits._ -import org.apache.spark.mllib.linalg.VectorImplicits._ -import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Vectors => OldVectors} import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{StructField, StructType} @@ -92,12 +88,13 @@ class PCA @Since("1.5.0") ( @Since("2.0.0") override def fit(dataset: Dataset[_]): PCAModel = { transformSchema(dataset.schema, logging = true) - val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map { + val input = dataset.select($(inputCol)).rdd.map { case Row(v: Vector) => OldVectors.fromML(v) } val pca = new feature.PCA(k = $(k)) val pcaModel = pca.fit(input) - copyValues(new PCAModel(uid, pcaModel.pc, pcaModel.explainedVariance).setParent(this)) + copyValues(new PCAModel(uid, pcaModel.pc.asML, pcaModel.explainedVariance.asML) + .setParent(this)) } @Since("1.5.0") @@ -149,15 +146,24 @@ class PCAModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - val pcaModel = new feature.PCAModel($(k), - OldMatrices.fromML(pc).asInstanceOf[OldDenseMatrix], - OldVectors.fromML(explainedVariance).asInstanceOf[OldDenseVector]) - // TODO: Make the transformer natively in ml framework to avoid extra conversion. - val transformer: Vector => Vector = v => pcaModel.transform(OldVectors.fromML(v)).asML + val func = { vector: Vector => + vector match { + case dv: DenseVector => + pc.transpose.multiply(dv) + case SparseVector(size, indices, values) => + /* SparseVector -> single row SparseMatrix */ + val sm = Matrices.sparse(size, 1, Array(0, indices.length), indices, values).transpose + val projection = sm.multiply(pc) + Vectors.dense(projection.values) + case _ => + throw new IllegalArgumentException("Unsupported vector format. Expected " + + s"SparseVector or DenseVector. Instead got: ${vector.getClass}") + } + } - val pcaOp = udf(transformer) - dataset.withColumn($(outputCol), pcaOp(col($(inputCol)))) + val transformer = udf(func) + dataset.withColumn($(outputCol), transformer(col($(inputCol)))) } @Since("1.5.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index 91b0707dec3f3..17f2c17c9552e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml._ -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -162,11 +162,34 @@ class StandardScalerModel private[ml] ( transformSchema(dataset.schema, logging = true) val scaler = new feature.StandardScalerModel(std, mean, $(withStd), $(withMean)) - // TODO: Make the transformer natively in ml framework to avoid extra conversion. - val transformer: Vector => Vector = v => scaler.transform(OldVectors.fromML(v)).asML + val func = if ($(withMean)) { + vector: Vector => + val values = vector match { + // specially handle DenseVector because its toArray does not clone already + case d: DenseVector => d.values.clone() + case v: Vector => v.toArray + } + val newValues = scaler.transfromWithMean(values) + Vectors.dense(newValues) + } else if ($(withStd)) { + vector: Vector => + vector match { + case DenseVector(values) => + val newValues = scaler.transformDenseWithStd(values) + Vectors.dense(newValues) + case SparseVector(size, indices, values) => + val (newIndices, newValues) = scaler.transformSparseWithStd(indices, values) + Vectors.sparse(size, newIndices, newValues) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + } else { + vector: Vector => vector + } - val scale = udf(transformer) - dataset.withColumn($(outputCol), scale(col($(inputCol)))) + val transformer = udf(func) + dataset.withColumn($(outputCol), transformer(col($(inputCol)))) } @Since("1.4.0") diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 91bc01cc0b558..85444770fec6b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -194,7 +194,7 @@ class LocalLDAModel private[spark] ( override protected[spark] val gammaShape: Double = 100) extends LDAModel with Serializable { - private var seed: Long = Utils.random.nextLong() + private[spark] var seed: Long = Utils.random.nextLong() @Since("1.3.0") override def k: Int = topics.numCols @@ -386,31 +386,6 @@ class LocalLDAModel private[spark] ( } } - /** - * Get a method usable as a UDF for `topicDistributions()` - */ - private[spark] def getTopicDistributionMethod: Vector => Vector = { - val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t) - val docConcentrationBrz = this.docConcentration.asBreeze - val gammaShape = this.gammaShape - val k = this.k - val gammaSeed = this.seed - - (termCounts: Vector) => - if (termCounts.numNonzeros == 0) { - Vectors.zeros(k) - } else { - val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference( - termCounts, - expElogbeta, - docConcentrationBrz, - gammaShape, - k, - gammaSeed) - Vectors.dense(normalize(gamma, 1.0).toArray) - } - } - /** * Predicts the topic mixture distribution for a document (often called "theta" in the * literature). Returns a vector of zeros for an empty document. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index b54f2c0a6c2e4..5eea69022562b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -595,7 +595,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer with Logging { * Serializable companion object containing helper methods and shared code for * [[OnlineLDAOptimizer]] and [[LocalLDAModel]]. */ -private[clustering] object OnlineLDAOptimizer { +private[spark] object OnlineLDAOptimizer { /** * Uses variational inference to infer the topic distribution `gammad` given the term counts * for a document. `termCounts` must contain at least one non-zero entry, otherwise Breeze will @@ -608,27 +608,24 @@ private[clustering] object OnlineLDAOptimizer { * @return Returns a tuple of `gammad` - estimate of gamma, the topic distribution, `sstatsd` - * statistics for updating lambda and `ids` - list of termCounts vector indices. */ - private[clustering] def variationalTopicInference( - termCounts: Vector, + private[spark] def variationalTopicInference( + indices: List[Int], + values: Array[Double], expElogbeta: BDM[Double], alpha: breeze.linalg.Vector[Double], gammaShape: Double, k: Int, seed: Long): (BDV[Double], BDM[Double], List[Int]) = { - val (ids: List[Int], cts: Array[Double]) = termCounts match { - case v: DenseVector => ((0 until v.size).toList, v.values) - case v: SparseVector => (v.indices.toList, v.values) - } // Initialize the variational distribution q(theta|gamma) for the mini-batch val randBasis = new RandBasis(new org.apache.commons.math3.random.MersenneTwister(seed)) val gammad: BDV[Double] = new Gamma(gammaShape, 1.0 / gammaShape)(randBasis).samplesVector(k) // K val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad)) // K - val expElogbetad = expElogbeta(ids, ::).toDenseMatrix // ids * K + val expElogbetad = expElogbeta(indices, ::).toDenseMatrix // ids * K val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100 // ids var meanGammaChange = 1D - val ctsVector = new BDV[Double](cts) // ids + val ctsVector = new BDV[Double](values) // ids // Iterate between gamma and phi until convergence while (meanGammaChange > 1e-3) { @@ -642,6 +639,20 @@ private[clustering] object OnlineLDAOptimizer { } val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector /:/ phiNorm).asDenseMatrix - (gammad, sstatsd, ids) + (gammad, sstatsd, indices) + } + + private[clustering] def variationalTopicInference( + termCounts: Vector, + expElogbeta: BDM[Double], + alpha: breeze.linalg.Vector[Double], + gammaShape: Double, + k: Int, + seed: Long): (BDV[Double], BDM[Double], List[Int]) = { + val (ids: List[Int], cts: Array[Double]) = termCounts match { + case v: DenseVector => (List.range(0, v.size), v.values) + case v: SparseVector => (v.indices.toList, v.values) + } + variationalTopicInference(ids, cts, expElogbeta, alpha, gammaShape, k, seed) } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala index c4bbe51a46c32..b6675907ce0e7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAUtils.scala @@ -22,7 +22,7 @@ import breeze.numerics._ /** * Utility methods for LDA. */ -private[clustering] object LDAUtils { +private[spark] object LDAUtils { /** * Log Sum Exp with overflow protection using the identity: * For any a: $\log \sum_{n=1}^N \exp\{x_n\} = a + \log \sum_{n=1}^N \exp\{x_n - a\}$ @@ -44,7 +44,7 @@ private[clustering] object LDAUtils { * Computes [[dirichletExpectation()]] row-wise, assuming each row of alpha are * Dirichlet parameters. */ - private[clustering] def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = { + private[spark] def dirichletExpectation(alpha: BDM[Double]): BDM[Double] = { val rowSum = sum(alpha(breeze.linalg.*, ::)) val digAlpha = digamma(alpha) val digRowSum = digamma(rowSum) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index fc0a45c6af53a..82f5b279846ba 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -75,40 +75,48 @@ class ChiSqSelectorModel @Since("1.3.0") ( private def compress(features: Vector): Vector = { features match { case SparseVector(size, indices, values) => - val newSize = filterIndices.length - val newValues = new ArrayBuilder.ofDouble - val newIndices = new ArrayBuilder.ofInt - var i = 0 - var j = 0 - var indicesIdx = 0 - var filterIndicesIdx = 0 - while (i < indices.length && j < filterIndices.length) { - indicesIdx = indices(i) - filterIndicesIdx = filterIndices(j) - if (indicesIdx == filterIndicesIdx) { - newIndices += j - newValues += values(i) - j += 1 - i += 1 - } else { - if (indicesIdx > filterIndicesIdx) { - j += 1 - } else { - i += 1 - } - } - } - // TODO: Sparse representation might be ineffective if (newSize ~= newValues.size) - Vectors.sparse(newSize, newIndices.result(), newValues.result()) + val (newIndices, newValues) = compressSparse(indices, values) + Vectors.sparse(filterIndices.length, newIndices, newValues) case DenseVector(values) => - val values = features.toArray - Vectors.dense(filterIndices.map(i => values(i))) + Vectors.dense(compressDense(values)) case other => throw new UnsupportedOperationException( s"Only sparse and dense vectors are supported but got ${other.getClass}.") } } + private[spark] def compressSparse(indices: Array[Int], + values: Array[Double]): (Array[Int], Array[Double]) = { + val newValues = new ArrayBuilder.ofDouble + val newIndices = new ArrayBuilder.ofInt + var i = 0 + var j = 0 + var indicesIdx = 0 + var filterIndicesIdx = 0 + while (i < indices.length && j < filterIndices.length) { + indicesIdx = indices(i) + filterIndicesIdx = filterIndices(j) + if (indicesIdx == filterIndicesIdx) { + newIndices += j + newValues += values(i) + j += 1 + i += 1 + } else { + if (indicesIdx > filterIndicesIdx) { + j += 1 + } else { + i += 1 + } + } + } + // TODO: Sparse representation might be ineffective if (newSize ~= newValues.size) + (newIndices.result(), newValues.result()) + } + + private[spark] def compressDense(values: Array[Double]): Array[Double] = { + filterIndices.map(i => values(i)) + } + @Since("1.6.0") override def save(sc: SparkContext, path: String): Unit = { ChiSqSelectorModel.SaveLoadV1_0.save(sc, this, path) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ElementwiseProduct.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ElementwiseProduct.scala index c757fc7f06c58..058598c757843 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ElementwiseProduct.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ElementwiseProduct.scala @@ -41,25 +41,38 @@ class ElementwiseProduct @Since("1.4.0") ( require(vector.size == scalingVec.size, s"vector sizes do not match: Expected ${scalingVec.size} but found ${vector.size}") vector match { - case dv: DenseVector => - val values: Array[Double] = dv.values.clone() - val dim = scalingVec.size - var i = 0 - while (i < dim) { - values(i) *= scalingVec(i) - i += 1 - } - Vectors.dense(values) - case SparseVector(size, indices, vs) => - val values = vs.clone() - val dim = values.length - var i = 0 - while (i < dim) { - values(i) *= scalingVec(indices(i)) - i += 1 - } - Vectors.sparse(size, indices, values) - case v => throw new IllegalArgumentException("Does not support vector type " + v.getClass) + case DenseVector(values) => + val newValues = transformDense(values) + Vectors.dense(newValues) + case SparseVector(size, indices, values) => + val (newIndices, newValues) = transformSparse(indices, values) + Vectors.sparse(size, newIndices, newValues) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") } } + + private[spark] def transformDense(values: Array[Double]): Array[Double] = { + val newValues = values.clone() + val dim = scalingVec.size + var i = 0 + while (i < dim) { + newValues(i) *= scalingVec(i) + i += 1 + } + newValues + } + + private[spark] def transformSparse(indices: Array[Int], + values: Array[Double]): (Array[Int], Array[Double]) = { + val newValues = values.clone() + val dim = newValues.length + var i = 0 + while (i < dim) { + newValues(i) *= scalingVec(indices(i)) + i += 1 + } + (indices, newValues) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala index 8935c8496cdbb..d3b27e1808e15 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/HashingTF.scala @@ -94,6 +94,11 @@ class HashingTF(val numFeatures: Int) extends Serializable { */ @Since("1.1.0") def transform(document: Iterable[_]): Vector = { + val seq = transformImpl(document) + Vectors.sparse(numFeatures, seq) + } + + private[spark] def transformImpl(document: Iterable[_]): Seq[(Int, Double)] = { val termFrequencies = mutable.HashMap.empty[Int, Double] val setTF = if (binary) (i: Int) => 1.0 else (i: Int) => termFrequencies.getOrElse(i, 0.0) + 1.0 val hashFunc: Any => Int = getHashFunction @@ -101,7 +106,7 @@ class HashingTF(val numFeatures: Int) extends Serializable { val i = Utils.nonNegativeMod(hashFunc(term), numFeatures) termFrequencies.put(i, setTF(i)) } - Vectors.sparse(numFeatures, termFrequencies.toSeq) + termFrequencies.toSeq } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala index 6407be68c9da5..e868f0f92509a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala @@ -203,7 +203,7 @@ class IDFModel private[spark](@Since("1.1.0") val idf: Vector, } } -private object IDFModel { +private[spark] object IDFModel { /** * Transforms a term frequency (TF) vector to a TF-IDF vector with a IDF vector @@ -213,28 +213,41 @@ private object IDFModel { * @return a TF-IDF vector */ def transform(idf: Vector, v: Vector): Vector = { - val n = v.size v match { case SparseVector(size, indices, values) => - val nnz = indices.length - val newValues = new Array[Double](nnz) - var k = 0 - while (k < nnz) { - newValues(k) = values(k) * idf(indices(k)) - k += 1 - } - Vectors.sparse(n, indices, newValues) + val (newIndices, newValues) = transformSparse(idf, indices, values) + Vectors.sparse(size, newIndices, newValues) case DenseVector(values) => - val newValues = new Array[Double](n) - var j = 0 - while (j < n) { - newValues(j) = values(j) * idf(j) - j += 1 - } + val newValues = transformDense(idf, values) Vectors.dense(newValues) case other => throw new UnsupportedOperationException( s"Only sparse and dense vectors are supported but got ${other.getClass}.") } } + + private[spark] def transformDense(idf: Vector, + values: Array[Double]): Array[Double] = { + val n = values.length + val newValues = new Array[Double](n) + var j = 0 + while (j < n) { + newValues(j) = values(j) * idf(j) + j += 1 + } + newValues + } + + private[spark] def transformSparse(idf: Vector, + indices: Array[Int], + values: Array[Double]): (Array[Int], Array[Double]) = { + val nnz = indices.length + val newValues = new Array[Double](nnz) + var k = 0 + while (k < nnz) { + newValues(k) = values(k) * idf(indices(k)) + k += 1 + } + (indices, newValues) + } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index 7667936a3f85f..578b779cd52d2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -135,58 +135,76 @@ class StandardScalerModel @Since("1.3.0") ( override def transform(vector: Vector): Vector = { require(mean.size == vector.size) if (withMean) { - // By default, Scala generates Java methods for member variables. So every time when - // the member variables are accessed, `invokespecial` will be called which is expensive. - // This can be avoid by having a local reference of `shift`. - val localShift = shift // Must have a copy of the values since it will be modified in place val values = vector match { // specially handle DenseVector because its toArray does not clone already case d: DenseVector => d.values.clone() case v: Vector => v.toArray } - val size = values.length - if (withStd) { - var i = 0 - while (i < size) { - values(i) = if (std(i) != 0.0) (values(i) - localShift(i)) * (1.0 / std(i)) else 0.0 - i += 1 - } - } else { - var i = 0 - while (i < size) { - values(i) -= localShift(i) - i += 1 - } - } - Vectors.dense(values) + val newValues = transfromWithMean(values) + Vectors.dense(newValues) } else if (withStd) { vector match { - case DenseVector(vs) => - val values = vs.clone() - val size = values.length - var i = 0 - while(i < size) { - values(i) *= (if (std(i) != 0.0) 1.0 / std(i) else 0.0) - i += 1 - } - Vectors.dense(values) - case SparseVector(size, indices, vs) => - // For sparse vector, the `index` array inside sparse vector object will not be changed, - // so we can re-use it to save memory. - val values = vs.clone() - val nnz = values.length - var i = 0 - while (i < nnz) { - values(i) *= (if (std(indices(i)) != 0.0) 1.0 / std(indices(i)) else 0.0) - i += 1 - } - Vectors.sparse(size, indices, values) - case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + case DenseVector(values) => + val newValues = transformDenseWithStd(values) + Vectors.dense(newValues) + case SparseVector(size, indices, values) => + val (newIndices, newValues) = transformSparseWithStd(indices, values) + Vectors.sparse(size, newIndices, newValues) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") } } else { // Note that it's safe since we always assume that the data in RDD should be immutable. vector } } + + private[spark] def transfromWithMean(values: Array[Double]): Array[Double] = { + // By default, Scala generates Java methods for member variables. So every time when + // the member variables are accessed, `invokespecial` will be called which is expensive. + // This can be avoid by having a local reference of `shift`. + val localShift = shift + val size = values.length + if (withStd) { + var i = 0 + while (i < size) { + values(i) = if (std(i) != 0.0) (values(i) - localShift(i)) * (1.0 / std(i)) else 0.0 + i += 1 + } + } else { + var i = 0 + while (i < size) { + values(i) -= localShift(i) + i += 1 + } + } + values + } + + private[spark] def transformDenseWithStd(values: Array[Double]): Array[Double] = { + val size = values.length + val newValues = values.clone() + var i = 0 + while(i < size) { + newValues(i) *= (if (std(i) != 0.0) 1.0 / std(i) else 0.0) + i += 1 + } + newValues + } + + private[spark] def transformSparseWithStd(indices: Array[Int], + values: Array[Double]): (Array[Int], Array[Double]) = { + // For sparse vector, the `index` array inside sparse vector object will not be changed, + // so we can re-use it to save memory. + val nnz = values.length + val newValues = values.clone() + var i = 0 + while (i < nnz) { + newValues(i) *= (if (std(indices(i)) != 0.0) 1.0 / std(indices(i)) else 0.0) + i += 1 + } + (indices, newValues) + } }