Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,14 @@ abstract class LDAModel private[ml] (
*/
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
if ($(topicDistributionCol).nonEmpty) {
transformSchema(dataset.schema, logging = true)

// TODO: Make the transformer natively in ml framework to avoid extra conversion.
if ($(topicDistributionCol).nonEmpty) {
val transformer = oldLocalModel.getTopicDistributionMethod

val t = udf { (v: Vector) => transformer(OldVectors.fromML(v)).asML }
val t = udf { v: Vector => transformer(v) }
dataset.withColumn($(topicDistributionCol),
t(DatasetUtils.columnToVector(dataset, getFeaturesCol))).toDF()
t(DatasetUtils.columnToVector(dataset, getFeaturesCol)))
} else {
logWarning("LDAModel.transform was called without any output columns. Set an output column" +
" such as topicDistributionCol to produce results.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ package org.apache.spark.ml.feature

import java.{util => ju}

import org.json4s.JsonDSL._
import org.json4s.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkException
import org.apache.spark.annotation.Since
import org.apache.spark.ml.Model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,13 @@ final class ChiSqSelectorModel private[ml] (

@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val transformedSchema = transformSchema(dataset.schema, logging = true)
val newField = transformedSchema.last
val outputSchema = transformSchema(dataset.schema, logging = true)

// TODO: Make the transformer natively in ml framework to avoid extra conversion.
val transformer: Vector => Vector = v => chiSqSelector.transform(OldVectors.fromML(v)).asML
val transformer: Vector => Vector = v => chiSqSelector.compress(v)

val selector = udf(transformer)
dataset.withColumn($(outputCol), selector(col($(featuresCol))), newField.metadata)
dataset.withColumn($(outputCol), selector(col($(featuresCol))),
outputSchema($(outputCol)).metadata)
}

@Since("1.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.graphx.{Edge, EdgeContext, Graph, VertexId}
import org.apache.spark.ml.linalg.{Vector => NewVector, Vectors => NewVectors}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah OK, I think we don't want to import .ml vectors in .mllib here. But the method below is only used in .ml now. Just move it to .ml.clustering.LDAModel with your changes?

import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -389,16 +390,16 @@ class LocalLDAModel private[spark] (
/**
* Get a method usable as a UDF for `topicDistributions()`
*/
private[spark] def getTopicDistributionMethod: Vector => Vector = {
private[spark] def getTopicDistributionMethod: NewVector => NewVector = {
val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.asBreeze.toDenseMatrix.t).t)
val docConcentrationBrz = this.docConcentration.asBreeze
val gammaShape = this.gammaShape
val k = this.k
val gammaSeed = this.seed

(termCounts: Vector) =>
termCounts: NewVector =>
if (termCounts.numNonzeros == 0) {
Vectors.zeros(k)
NewVectors.zeros(k)
} else {
val (gamma, _, _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts,
Expand All @@ -407,7 +408,7 @@ class LocalLDAModel private[spark] (
gammaShape,
k,
gammaSeed)
Vectors.dense(normalize(gamma, 1.0).toArray)
NewVectors.dense(normalize(gamma, 1.0).toArray)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.PeriodicGraphCheckpointer
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.{DenseVector => NewDenseVector,
SparseVector => NewSparseVector, Vector => NewVector}
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -609,26 +611,23 @@ private[clustering] object OnlineLDAOptimizer {
* statistics for updating lambda and `ids` - list of termCounts vector indices.
*/
private[clustering] def variationalTopicInference(
termCounts: Vector,
indices: List[Int],
values: Array[Double],
expElogbeta: BDM[Double],
alpha: breeze.linalg.Vector[Double],
gammaShape: Double,
k: Int,
seed: Long): (BDV[Double], BDM[Double], List[Int]) = {
val (ids: List[Int], cts: Array[Double]) = termCounts match {
case v: DenseVector => ((0 until v.size).toList, v.values)
case v: SparseVector => (v.indices.toList, v.values)
}
// Initialize the variational distribution q(theta|gamma) for the mini-batch
val randBasis = new RandBasis(new org.apache.commons.math3.random.MersenneTwister(seed))
val gammad: BDV[Double] =
new Gamma(gammaShape, 1.0 / gammaShape)(randBasis).samplesVector(k) // K
val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad)) // K
val expElogbetad = expElogbeta(ids, ::).toDenseMatrix // ids * K
val expElogbetad = expElogbeta(indices, ::).toDenseMatrix // ids * K

val phiNorm: BDV[Double] = expElogbetad * expElogthetad +:+ 1e-100 // ids
var meanGammaChange = 1D
val ctsVector = new BDV[Double](cts) // ids
val ctsVector = new BDV[Double](values) // ids

// Iterate between gamma and phi until convergence
while (meanGammaChange > 1e-3) {
Expand All @@ -642,6 +641,34 @@ private[clustering] object OnlineLDAOptimizer {
}

val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector /:/ phiNorm).asDenseMatrix
(gammad, sstatsd, ids)
(gammad, sstatsd, indices)
}

private[clustering] def variationalTopicInference(
termCounts: Vector,
expElogbeta: BDM[Double],
alpha: breeze.linalg.Vector[Double],
gammaShape: Double,
k: Int,
seed: Long): (BDV[Double], BDM[Double], List[Int]) = {
val (ids: List[Int], cts: Array[Double]) = termCounts match {
case v: DenseVector => ((0 until v.size).toList, v.values)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and elsewhere, as an optimization, can we avoid (0 until v.size).toList)? pass an empty list in this case or something, and then deduce that the indices are just the same length as the values?

You're generally solving this with separate sparse/dense methods which could be fine too if it doesn't result in too much code duplication and improves performance in the dense case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good then except we might be able to make one more optimization here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just look into the usage of indices ids, and find that it is used as slicing indices like val expElogbetad = expElogbeta(indices, ::).toDenseMatrix.
I will have a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid that an empty list may not help to simplify the impl.
since in place like private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer, we still have to create a List for slicing.

case v: SparseVector => (v.indices.toList, v.values)
}
variationalTopicInference(ids, cts, expElogbeta, alpha, gammaShape, k, seed)
}

private[clustering] def variationalTopicInference(
termCounts: NewVector,
expElogbeta: BDM[Double],
alpha: breeze.linalg.Vector[Double],
gammaShape: Double,
k: Int,
seed: Long): (BDV[Double], BDM[Double], List[Int]) = {
val (ids: List[Int], cts: Array[Double]) = termCounts match {
case v: NewDenseVector => ((0 until v.size).toList, v.values)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to avoid materializing this list of indices. In the dense case it's redundant. If not passed, assume the dense case?

case v: NewSparseVector => (v.indices.toList, v.values)
}
variationalTopicInference(ids, cts, expElogbeta, alpha, gammaShape, k, seed)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.json4s.jackson.JsonMethods._

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Since
import org.apache.spark.ml.linalg.{DenseVector => NewDenseVector,
SparseVector => NewSparseVector, Vector => NewVector, Vectors => NewVectors}
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.Statistics
Expand Down Expand Up @@ -75,40 +77,61 @@ class ChiSqSelectorModel @Since("1.3.0") (
private def compress(features: Vector): Vector = {
features match {
case SparseVector(size, indices, values) =>
val newSize = filterIndices.length
val newValues = new ArrayBuilder.ofDouble
val newIndices = new ArrayBuilder.ofInt
var i = 0
var j = 0
var indicesIdx = 0
var filterIndicesIdx = 0
while (i < indices.length && j < filterIndices.length) {
indicesIdx = indices(i)
filterIndicesIdx = filterIndices(j)
if (indicesIdx == filterIndicesIdx) {
newIndices += j
newValues += values(i)
j += 1
i += 1
} else {
if (indicesIdx > filterIndicesIdx) {
j += 1
} else {
i += 1
}
}
}
// TODO: Sparse representation might be ineffective if (newSize ~= newValues.size)
Vectors.sparse(newSize, newIndices.result(), newValues.result())
val (newIndices, newValues) = compressSparse(indices, values)
Vectors.sparse(filterIndices.length, newIndices, newValues)
case DenseVector(values) =>
val values = features.toArray
Vectors.dense(filterIndices.map(i => values(i)))
Vectors.dense(compressDense(values))
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}

private[spark] def compress(features: NewVector): NewVector = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem like general methods, not specific to chi-squared. Do we not already do some of this work in the Vector constructors or an existing utility method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here, I don't think we want to handle .ml vectors in .mllib. I think the idea is to make this .mllib method more generic, perhaps just operating on indices and values?

features match {
case NewSparseVector(size, indices, values) =>
val (newIndices, newValues) = compressSparse(indices, values)
NewVectors.sparse(filterIndices.length, newIndices, newValues)
case NewDenseVector(values) =>
NewVectors.dense(compressDense(values))
case other =>
throw new UnsupportedOperationException(
s"Only sparse and dense vectors are supported but got ${other.getClass}.")
}
}

private def compressSparse(indices: Array[Int],
values: Array[Double]): (Array[Int], Array[Double]) = {
val newValues = new ArrayBuilder.ofDouble
val newIndices = new ArrayBuilder.ofInt
var i = 0
var j = 0
var indicesIdx = 0
var filterIndicesIdx = 0
while (i < indices.length && j < filterIndices.length) {
indicesIdx = indices(i)
filterIndicesIdx = filterIndices(j)
if (indicesIdx == filterIndicesIdx) {
newIndices += j
newValues += values(i)
j += 1
i += 1
} else {
if (indicesIdx > filterIndicesIdx) {
j += 1
} else {
i += 1
}
}
}
// TODO: Sparse representation might be ineffective if (newSize ~= newValues.size)
(newIndices.result(), newValues.result())
}

private def compressDense(values: Array[Double]): Array[Double] = {
filterIndices.map(i => values(i))
}

@Since("1.6.0")
override def save(sc: SparkContext, path: String): Unit = {
ChiSqSelectorModel.SaveLoadV1_0.save(sc, this, path)
Expand Down