Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ object FMClassificationModel extends MLReadable[FMClassificationModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.intercept, instance.linear, instance.factors)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.coefficients, instance.intercept)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ object LogisticRegressionModel extends MLReadable[LogisticRegressionModel] {
val data = Data(instance.numClasses, instance.numFeatures, instance.interceptVector,
instance.coefficientMatrix, instance.isMultinomial)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ object MultilayerPerceptronClassificationModel
// Save model data: weights
val data = Data(instance.weights)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] {
}

val data = Data(instance.pi, instance.theta, instance.sigma)
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ object GaussianMixtureModel extends MLReadable[GaussianMixtureModel] {
val sigmas = gaussians.map(c => OldMatrices.fromML(c.cov))
val data = Data(weights, mus, sigmas)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] {
val data = Data(instance.vocabSize, oldModel.topicsMatrix, oldModel.docConcentration,
oldModel.topicConcentration, oldModel.gammaShape)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ object BucketedRandomProjectionLSHModel extends MLReadable[BucketedRandomProject
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.randMatrix)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ object ChiSqSelectorModel extends MLReadable[ChiSqSelectorModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.selectedFeatures.toImmutableArraySeq)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ object CountVectorizerModel extends MLReadable[CountVectorizerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.vocabulary.toImmutableArraySeq)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
2 changes: 1 addition & 1 deletion mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ object IDFModel extends MLReadable[IDFModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.idf, instance.docFreq, instance.numDocs)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ object MaxAbsScalerModel extends MLReadable[MaxAbsScalerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = new Data(instance.maxAbs)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ object MinHashLSHModel extends MLReadable[MinHashLSHModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.randCoefficients.flatMap(tuple => Array(tuple._1, tuple._2)))
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ object MinMaxScalerModel extends MLReadable[MinMaxScalerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = new Data(instance.originalMin, instance.originalMax)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ object OneHotEncoderModel extends MLReadable[OneHotEncoderModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.categorySizes)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
2 changes: 1 addition & 1 deletion mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ object PCAModel extends MLReadable[PCAModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.pc, instance.explainedVariance)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ private object ColumnPruner extends MLReadable[ColumnPruner] {
// Save model data: columnsToPrune
val data = Data(instance.columnsToPrune.toSeq)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down Expand Up @@ -598,7 +598,7 @@ private object VectorAttributeRewriter extends MLReadable[VectorAttributeRewrite
// Save model data: vectorCol, prefixesToRewrite
val data = Data(instance.vectorCol, instance.prefixesToRewrite)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object RobustScalerModel extends MLReadable[RobustScalerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.range, instance.median)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ object StandardScalerModel extends MLReadable[StandardScalerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.std, instance.mean)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ object StringIndexerModel extends MLReadable[StringIndexerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.labelsArray)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ object UnivariateFeatureSelectorModel extends MLReadable[UnivariateFeatureSelect
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.selectedFeatures.toImmutableArraySeq)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ object VarianceThresholdSelectorModel extends MLReadable[VarianceThresholdSelect
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.selectedFeatures.toImmutableArraySeq)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ object VectorIndexerModel extends MLReadable[VectorIndexerModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.numFeatures, instance.categoryMaps)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ object AFTSurvivalRegressionModel extends MLReadable[AFTSurvivalRegressionModel]
// Save model data: coefficients, intercept, scale
val data = Data(instance.coefficients, instance.intercept, instance.scale)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ object FMRegressionModel extends MLReadable[FMRegressionModel] {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val data = Data(instance.intercept, instance.linear, instance.factors)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,7 @@ object GeneralizedLinearRegressionModel extends MLReadable[GeneralizedLinearRegr
// Save model data: intercept, coefficients
val data = Data(instance.intercept, instance.coefficients)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ object IsotonicRegressionModel extends MLReadable[IsotonicRegressionModel] {
val data = Data(
instance.oldModel.boundaries, instance.oldModel.predictions, instance.oldModel.isotonic)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ private class InternalLinearRegressionModelWriter
// Save model data: intercept, coefficients, scale
val data = Data(instance.intercept, instance.coefficients, instance.scale)
val dataPath = new Path(path, "data").toString
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath)
}
}

Expand Down
8 changes: 6 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,10 @@ private[ml] object DefaultParamsWriter {
paramMap: Option[JValue] = None): Unit = {
val metadataPath = new Path(path, "metadata").toString
val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap)
sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath)
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
// Note that we should write single file. If there are more than one row
// it produces more partitions.
spark.createDataFrame(Seq(Tuple1(metadataJson))).write.text(metadataPath)
}

/**
Expand Down Expand Up @@ -585,7 +588,8 @@ private[ml] object DefaultParamsReader {
*/
def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata = {
val metadataPath = new Path(path, "metadata").toString
val metadataStr = sc.textFile(metadataPath, 1).first()
val spark = SparkSession.getActiveSession.get
val metadataStr = spark.read.text(metadataPath).first().getString(0)
parseMetadata(metadataStr, expectedClassName)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))

// Create Parquet data.
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
spark.createDataFrame(Seq(data)).write.parquet(dataPath(path))
}

@Since("1.3.0")
Expand Down Expand Up @@ -243,10 +243,10 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path))

// Create Parquet data.
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path))
spark.createDataFrame(Seq(data)).write.parquet(dataPath(path))
}

def load(sc: SparkContext, path: String): NaiveBayesModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ private[classification] object GLMClassificationModel {
val metadata = compact(render(
("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~
("numFeatures" -> numFeatures) ~ ("numClasses" -> numClasses)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val data = Data(weights, intercept, threshold)
spark.createDataFrame(Seq(data)).repartition(1).write.parquet(Loader.dataPath(path))
spark.createDataFrame(Seq(data)).write.parquet(Loader.dataPath(path))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
Expand Down Expand Up @@ -215,7 +215,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
Expand Down Expand Up @@ -253,7 +253,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] {
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure)
~ ("trainingCost" -> model.trainingCost)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val data = getNodes(model.root).map(node => Data(node.index, node.size,
node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
// Create JSON metadata.
val metadata = compact(render
(("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("k" -> weights.length)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

// Create Parquet data.
val dataArray = Array.tabulate(weights.length) { i =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ object KMeansModel extends Loader[KMeansModel] {
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))
val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq)
.map { case (p, id) =>
Cluster(id, p.vector)
Expand Down Expand Up @@ -207,7 +207,7 @@ object KMeansModel extends Loader[KMeansModel] {
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
~ ("k" -> model.k) ~ ("distanceMeasure" -> model.distanceMeasure)
~ ("trainingCost" -> model.trainingCost)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))
val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq)
.map { case (p, id) =>
Cluster(id, p.vector)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] {
("docConcentration" -> docConcentration.toArray.toImmutableArraySeq) ~
("topicConcentration" -> topicConcentration) ~
("gammaShape" -> gammaShape)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val topicsDenseMatrix = topicsMatrix.asBreeze.toDenseMatrix
val topics = Range(0, k).map { topicInd =>
Expand Down Expand Up @@ -869,7 +869,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] {
("topicConcentration" -> topicConcentration) ~
("iterationTimes" -> iterationTimes.toImmutableArraySeq) ~
("gammaShape" -> gammaShape)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path))

val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString
spark.createDataFrame(Seq(Data(Vectors.fromBreeze(globalTopicTotals)))).write.parquet(newPath)
Expand Down
Loading