Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,10 @@ private[ml] object DefaultParamsWriter {
paramMap: Option[JValue] = None): Unit = {
val metadataPath = new Path(path, "metadata").toString
val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)
sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Note that we should write single file. If there are more than one row
// it produces more partitions.
spark.createDataFrame(Seq(Tuple1(metadataJson))).write.text(metadataPath)
}

/**
Expand Down Expand Up @@ -585,7 +588,8 @@ private[ml] object DefaultParamsReader {
*/
def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata = {
val metadataPath = new Path(path, "metadata").toString
val metadataStr = sc.textFile(metadataPath, 1).first()
val spark = SparkSession.getActiveSession.get
val metadataStr = spark.read.text(metadataPath).first().getString(0)
parseMetadata(metadataStr, expectedClassName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))

// Create Parquet data.
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
Expand Down Expand Up @@ -243,7 +243,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))

// Create Parquet data.
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[classification] object GLMClassificationModel {
val metadata = compact(render(
("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> numFeatures) ~ ("numClasses" -> numClasses)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val data = Data(weights, intercept, threshold)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
Expand Down Expand Up @@ -215,7 +215,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
Expand Down Expand Up @@ -253,7 +253,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure)
~ ("trainingCost" -> model.trainingCost)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
// Create JSON metadata.
val metadata = compact(render
(("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("k" -> weights.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val dataArray = Array.tabulate(weights.length) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object KMeansModel extends Loader[KMeansModel] {
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))
val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq)
.map { case (p, id) =>
Cluster(id, p.vector)
Expand Down Expand Up @@ -207,7 +207,7 @@ object KMeansModel extends Loader[KMeansModel] {
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("k" -> model.k) ~ ("distanceMeasure" -> model.distanceMeasure)
~ ("trainingCost" -> model.trainingCost)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))
val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq)
.map { case (p, id) =>
Cluster(id, p.vector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
("docConcentration" -> docConcentration.toArray.toImmutableArraySeq) ~
("topicConcentration" -> topicConcentration) ~
("gammaShape" -> gammaShape)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val topicsDenseMatrix = topicsMatrix.asBreeze.toDenseMatrix
val topics = Range(0, k).map { topicInd =>
Expand Down Expand Up @@ -869,7 +869,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
("topicConcentration" -> topicConcentration) ~
("iterationTimes" -> iterationTimes.toImmutableArraySeq) ~
("gammaShape" -> gammaShape)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString
spark.createDataFrame(Seq(Data(Vectors.fromBreeze(globalTopicTotals)))).write.parquet(newPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

spark.createDataFrame(model.assignments).write.parquet(Loader.dataPath(path))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] {

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val dataArray = Array.tabulate(model.selectedFeatures.length) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
val metadata = compact(render(
("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~
("vectorSize" -> vectorSize) ~ ("numWords" -> numWords)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// We want to partition the model in partitions smaller than
// spark.kryoserializer.buffer.max
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Get the type of item class
val sample = model.freqItemsets.first().items(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] {

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Get the type of item class
val sample = model.freqSequences.first().sequence(0)(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
import spark.implicits._
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))
model.userFeatures.toDF("id", "features").write.parquet(userPath(path))
model.productFeatures.toDF("id", "features").write.parquet(productPath(path))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("isotonic" -> isotonic)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))

spark.createDataFrame(
boundaries.toImmutableArraySeq.zip(predictions).map { case (b, p) => Data(b, p) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private[regression] object GLMRegressionModel {
val metadata = compact(render(
("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> weights.size)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val data = Data(weights, intercept)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,15 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
}

// Create JSON metadata.
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("algo" -> model.algo.toString) ~ ("numNodes" -> model.numNodes)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val nodes = model.topNode.subtreeIterator.toSeq
val dataRDD = sc.parallelize(nodes).map(NodeData.apply(0, _))
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ private[tree] object TreeEnsembleModel extends Logging {
val metadata = compact(render(
("class" -> className) ~ ("version" -> thisFormatVersion) ~
("metadata" -> Extraction.decompose(ensembleMetadata))))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val dataRDD = sc.parallelize(model.trees.zipWithIndex.toImmutableArraySeq)
Expand Down