diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index 9b26d0a911ac..021595f76c24 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -411,7 +411,10 @@ private[ml] object DefaultParamsWriter { paramMap: Option[JValue] = None): Unit = { val metadataPath = new Path(path, "metadata").toString val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap) - sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath) + val spark = SparkSession.builder().sparkContext(sc).getOrCreate() + // Note that we should write single file. If there are more than one row + // it produces more partitions. + spark.createDataFrame(Seq(Tuple1(metadataJson))).write.text(metadataPath) } /** @@ -585,7 +588,8 @@ private[ml] object DefaultParamsReader { */ def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata = { val metadataPath = new Path(path, "metadata").toString - val metadataStr = sc.textFile(metadataPath, 1).first() + val spark = SparkSession.getActiveSession.get + val metadataStr = spark.read.text(metadataPath).first().getString(0) parseMetadata(metadataStr, expectedClassName) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 3d36b8270861..98e784bab01c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -198,7 +198,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) // Create Parquet data. spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path)) @@ -243,7 +243,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) // Create Parquet data. spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala index 84491181d077..79c065a0a5db 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala @@ -57,7 +57,7 @@ private[classification] object GLMClassificationModel { val metadata = compact(render( ("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> numFeatures) ~ ("numClasses" -> numClasses))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val data = Data(weights, intercept, threshold) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index 9f3aad923897..083c3e3e77a9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala @@ -179,7 +179,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, @@ -215,7 +215,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, @@ -253,7 +253,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure) ~ ("trainingCost" -> model.trainingCost))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 8982a8ca7c6c..40a810a699ac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -147,7 +147,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { // Create JSON metadata. val metadata = compact(render (("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("k" -> weights.length))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val dataArray = Array.tabulate(weights.length) { i => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 476df64581f7..e5c0b27072d0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -172,7 +172,7 @@ object KMeansModel extends Loader[KMeansModel] { val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq) .map { case (p, id) => Cluster(id, p.vector) @@ -207,7 +207,7 @@ object KMeansModel extends Loader[KMeansModel] { ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k) ~ ("distanceMeasure" -> model.distanceMeasure) ~ ("trainingCost" -> model.trainingCost))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq) .map { case (p, id) => Cluster(id, p.vector) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 831c7a9316fd..10a81acede0c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -460,7 +460,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] { ("docConcentration" -> docConcentration.toArray.toImmutableArraySeq) ~ ("topicConcentration" -> topicConcentration) ~ ("gammaShape" -> gammaShape))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val topicsDenseMatrix = topicsMatrix.asBreeze.toDenseMatrix val topics = Range(0, k).map { topicInd => @@ -869,7 +869,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { ("topicConcentration" -> topicConcentration) ~ ("iterationTimes" -> iterationTimes.toImmutableArraySeq) ~ ("gammaShape" -> gammaShape))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString spark.createDataFrame(Seq(Data(Vectors.fromBreeze(globalTopicTotals)))).write.parquet(newPath) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 9150bb305876..639e762ef3c8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -73,7 +73,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) spark.createDataFrame(model.assignments).write.parquet(Loader.dataPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index 41eb6567b845..ed23df70c577 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -136,7 +136,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val dataArray = Array.tabulate(model.selectedFeatures.length) { i => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 499dc09b8621..b5b2233ecb75 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -686,7 +686,7 @@ object Word2VecModel extends Loader[Word2VecModel] { val metadata = compact(render( ("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // We want to partition the model in partitions smaller than // spark.kryoserializer.buffer.max diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index 246146a831f8..fd21e1998ce7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -108,7 +108,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Get the type of item class val sample = model.freqItemsets.first().items(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 3c648f34c610..9c16ac2ecd52 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -655,7 +655,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Get the type of item class val sample = model.freqSequences.first().sequence(0)(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index bc888aecec0a..fb9e8ac7c892 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -387,7 +387,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { import spark.implicits._ val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) model.userFeatures.toDF("id", "features").write.parquet(userPath(path)) model.productFeatures.toDF("id", "features").write.parquet(productPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index e957d8ebd74a..456580ffa531 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -188,7 +188,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("isotonic" -> isotonic))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) spark.createDataFrame( boundaries.toImmutableArraySeq.zip(predictions).map { case (b, p) => Data(b, p) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala index cd90e97cc538..6364b7a942d5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala @@ -53,7 +53,7 @@ private[regression] object GLMRegressionModel { val metadata = compact(render( ("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> weights.size))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val data = Data(weights, intercept) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index 2f65dea0c4a8..b45211c1689c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -226,15 +226,15 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { } // Create JSON metadata. + val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("algo" -> model.algo.toString) ~ ("numNodes" -> model.numNodes))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val nodes = model.topNode.subtreeIterator.toSeq val dataRDD = sc.parallelize(nodes).map(NodeData.apply(0, _)) - val spark = SparkSession.builder().sparkContext(sc).getOrCreate() spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index aa2287f3af89..7251dfd07a1f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -430,7 +430,7 @@ private[tree] object TreeEnsembleModel extends Logging { val metadata = compact(render( ("class" -> className) ~ ("version" -> thisFormatVersion) ~ ("metadata" -> Extraction.decompose(ensembleMetadata)))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val dataRDD = sc.parallelize(model.trees.zipWithIndex.toImmutableArraySeq)