Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.mllib.util.{Loader, Saveable}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.BoundedPriorityQueue
import org.apache.spark.util.Utils

/**
* Latent Dirichlet Allocation (LDA) model.
Expand Down Expand Up @@ -468,7 +469,16 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
val topics = Range(0, k).map { topicInd =>
Data(Vectors.dense((topicsDenseMatrix(::, topicInd).toArray)), topicInd)
}
spark.createDataFrame(topics).repartition(1).write.parquet(Loader.dataPath(path))

val bufferSize = Utils.byteStringAsBytes(
spark.conf.get("spark.kryoserializer.buffer.max", "64m"))
// We calculate the approximate size of the model
// We only calculate the array size, considering an
// average string size of 15 bytes, the formula is:
// (floatSize * vectorSize + 15) * numWords
val approxSize = (4L * k + 15) * topicsMatrix.numRows
val nPartitions = ((approxSize / bufferSize) + 1).toInt
spark.createDataFrame(topics).repartition(nPartitions).write.parquet(Loader.dataPath(path))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that this writes multiple files. I don't think you can do that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not? i think it does works. the multiple parquet files will may be in random order, but it will save topic indices. when u call load process, parquet will restore dataframe, u can check the LocalLDAModel's load method, it will scan all dataframe's row with the topic indices to rebuild the (topic x vocab) breeze matrix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the point of repartition(1) is to export this to a single file for external consumption. Of course writing it succeeds, but I don't think this is what it is intended to do.

Copy link
Author

@d0evi1 d0evi1 May 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read mllib's online lda's code implements and the paper.

i found a simple way to test it , u can try this code snippet, simulation a matrix to save:

val vocabSize = 500000
val k = 300
val random = new Random()
val topicsDenseMatrix = DenseMatrix.fill[Double](vocabSize, k)(random.nextDouble())

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@d0evi1 I'm not sure how that's related to my comment.
@hhbyyh what do you think -- am I wrong about the issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the PR is trying to do something similar to https://github.com/apache/spark/pull/9989/files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen what's your concern with saving multiple files instead of one?

We've encountered crippling errors saving large LDA models in the ml api as well. The problem there is even worse, since the entire matrix is treated as one single datum

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a testcase to save model into multiple files and load back and check the correctness ?

}

def load(
Expand Down