From 430ed9975f56dd043fdc700a1051d138667294b2 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 15 Jul 2024 08:42:10 +0900 Subject: [PATCH 1/4] Avoid repartition when writing out the metadata --- .../org/apache/spark/ml/classification/FMClassifier.scala | 2 +- .../org/apache/spark/ml/classification/LinearSVC.scala | 2 +- .../spark/ml/classification/LogisticRegression.scala | 2 +- .../classification/MultilayerPerceptronClassifier.scala | 2 +- .../org/apache/spark/ml/classification/NaiveBayes.scala | 2 +- .../org/apache/spark/ml/clustering/GaussianMixture.scala | 2 +- .../main/scala/org/apache/spark/ml/clustering/LDA.scala | 2 +- .../spark/ml/feature/BucketedRandomProjectionLSH.scala | 2 +- .../scala/org/apache/spark/ml/feature/ChiSqSelector.scala | 2 +- .../org/apache/spark/ml/feature/CountVectorizer.scala | 2 +- .../src/main/scala/org/apache/spark/ml/feature/IDF.scala | 2 +- .../scala/org/apache/spark/ml/feature/MaxAbsScaler.scala | 2 +- .../scala/org/apache/spark/ml/feature/MinHashLSH.scala | 2 +- .../scala/org/apache/spark/ml/feature/MinMaxScaler.scala | 2 +- .../scala/org/apache/spark/ml/feature/OneHotEncoder.scala | 2 +- .../src/main/scala/org/apache/spark/ml/feature/PCA.scala | 2 +- .../main/scala/org/apache/spark/ml/feature/RFormula.scala | 4 ++-- .../scala/org/apache/spark/ml/feature/RobustScaler.scala | 2 +- .../org/apache/spark/ml/feature/StandardScaler.scala | 2 +- .../scala/org/apache/spark/ml/feature/StringIndexer.scala | 2 +- .../spark/ml/feature/UnivariateFeatureSelector.scala | 2 +- .../spark/ml/feature/VarianceThresholdSelector.scala | 2 +- .../scala/org/apache/spark/ml/feature/VectorIndexer.scala | 2 +- .../spark/ml/regression/AFTSurvivalRegression.scala | 2 +- .../org/apache/spark/ml/regression/FMRegressor.scala | 2 +- .../spark/ml/regression/GeneralizedLinearRegression.scala | 2 +- .../apache/spark/ml/regression/IsotonicRegression.scala | 2 +- .../org/apache/spark/ml/regression/LinearRegression.scala | 2 +- .../main/scala/org/apache/spark/ml/util/ReadWrite.scala | 8 ++++++-- .../apache/spark/mllib/classification/NaiveBayes.scala | 4 ++-- .../classification/impl/GLMClassificationModel.scala | 2 +- .../spark/mllib/regression/impl/GLMRegressionModel.scala | 2 +- 32 files changed, 39 insertions(+), 35 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala index 4a4a4fffe5de..aec740a932ac 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/FMClassifier.scala @@ -342,7 +342,7 @@ object FMClassificationModel extends MLReadable[FMClassificationModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.intercept, instance.linear, instance.factors) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 4bcc7877658d..3e27f781d561 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -449,7 +449,7 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.coefficients, instance.intercept) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index b3c48f13591f..ac0682f1df5b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -1315,7 +1315,7 @@ object LogisticRegressionModel extends MLReadable[LogisticRegressionModel] { val data = Data(instance.numClasses, instance.numFeatures, instance.interceptVector, instance.coefficientMatrix, instance.isMultinomial) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala index 0ae1f0e277ad..16984bf9aed8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/MultilayerPerceptronClassifier.scala @@ -369,7 +369,7 @@ object MultilayerPerceptronClassificationModel // Save model data: weights val data = Data(instance.weights) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala index b7f9f97585fc..52486cb8aa24 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala @@ -591,7 +591,7 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] { } val data = Data(instance.pi, instance.theta, instance.sigma) - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index a68b2fc0dec8..0f6648bb4cda 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -243,7 +243,7 @@ object GaussianMixtureModel extends MLReadable[GaussianMixtureModel] { val sigmas = gaussians.map(c => OldMatrices.fromML(c.cov)) val data = Data(weights, mus, sigmas) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index 3f6bdda9e050..7cbfc732a19c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -659,7 +659,7 @@ object LocalLDAModel extends MLReadable[LocalLDAModel] { val data = Data(instance.vocabSize, oldModel.topicsMatrix, oldModel.docConcentration, oldModel.topicConcentration, oldModel.gammaShape) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala index 16f72e18b977..d30962088cb8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/BucketedRandomProjectionLSH.scala @@ -230,7 +230,7 @@ object BucketedRandomProjectionLSHModel extends MLReadable[BucketedRandomProject DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.randMatrix) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala index 10149a65a954..3062f643e950 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ChiSqSelector.scala @@ -176,7 +176,7 @@ object ChiSqSelectorModel extends MLReadable[ChiSqSelectorModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.selectedFeatures.toImmutableArraySeq) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala index 890266ed7a72..b81914f86fbb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/CountVectorizer.scala @@ -375,7 +375,7 @@ object CountVectorizerModel extends MLReadable[CountVectorizerModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.vocabulary.toImmutableArraySeq) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index e451d4daffbc..696e1516582d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -201,7 +201,7 @@ object IDFModel extends MLReadable[IDFModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.idf, instance.docFreq, instance.numDocs) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala index 2d48a5f9f491..05ee59d1627d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -165,7 +165,7 @@ object MaxAbsScalerModel extends MLReadable[MaxAbsScalerModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = new Data(instance.maxAbs) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala index cdedcc2de956..d94aadd1ce1f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinHashLSH.scala @@ -223,7 +223,7 @@ object MinHashLSHModel extends MLReadable[MinHashLSHModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.randCoefficients.flatMap(tuple => Array(tuple._1, tuple._2))) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala index 22c4ca9cddf4..4111e559a5c2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala @@ -250,7 +250,7 @@ object MinMaxScalerModel extends MLReadable[MinMaxScalerModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = new Data(instance.originalMin, instance.originalMax) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala index e32addc7ee19..e7cf0105754a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala @@ -406,7 +406,7 @@ object OneHotEncoderModel extends MLReadable[OneHotEncoderModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.categorySizes) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 16373a4c4af1..f7ec18b38a0e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -187,7 +187,7 @@ object PCAModel extends MLReadable[PCAModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.pc, instance.explainedVariance) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala index f3f85b409867..7a47e73e5ef4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RFormula.scala @@ -506,7 +506,7 @@ private object ColumnPruner extends MLReadable[ColumnPruner] { // Save model data: columnsToPrune val data = Data(instance.columnsToPrune.toSeq) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } @@ -598,7 +598,7 @@ private object VectorAttributeRewriter extends MLReadable[VectorAttributeRewrite // Save model data: vectorCol, prefixesToRewrite val data = Data(instance.vectorCol, instance.prefixesToRewrite) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala index df6e54ce12d9..0950dc55dccb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/RobustScaler.scala @@ -287,7 +287,7 @@ object RobustScalerModel extends MLReadable[RobustScalerModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.range, instance.median) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala index 92dee46ad005..c0a6392c29c3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StandardScaler.scala @@ -208,7 +208,7 @@ object StandardScalerModel extends MLReadable[StandardScalerModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.std, instance.mean) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index 60dc4d024071..dd0bc6f22b4b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -499,7 +499,7 @@ object StringIndexerModel extends MLReadable[StringIndexerModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.labelsArray) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala index 35e5b27183ad..29a091012495 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/UnivariateFeatureSelector.scala @@ -352,7 +352,7 @@ object UnivariateFeatureSelectorModel extends MLReadable[UnivariateFeatureSelect DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.selectedFeatures.toImmutableArraySeq) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VarianceThresholdSelector.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VarianceThresholdSelector.scala index 82b49bd80067..df57e19f1a72 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VarianceThresholdSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VarianceThresholdSelector.scala @@ -190,7 +190,7 @@ object VarianceThresholdSelectorModel extends MLReadable[VarianceThresholdSelect DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.selectedFeatures.toImmutableArraySeq) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 8eb8f81227ca..4fed325e19e9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -522,7 +522,7 @@ object VectorIndexerModel extends MLReadable[VectorIndexerModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.numFeatures, instance.categoryMaps) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala index 788ad65497df..d77d79dae4b8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala @@ -498,7 +498,7 @@ object AFTSurvivalRegressionModel extends MLReadable[AFTSurvivalRegressionModel] // Save model data: coefficients, intercept, scale val data = Data(instance.coefficients, instance.intercept, instance.scale) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala index 6e09143e9ee7..8c797295e671 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/FMRegressor.scala @@ -507,7 +507,7 @@ object FMRegressionModel extends MLReadable[FMRegressionModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val data = Data(instance.intercept, instance.linear, instance.factors) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala index 4ded2f8d7bf5..181a1a03e6f3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala @@ -1145,7 +1145,7 @@ object GeneralizedLinearRegressionModel extends MLReadable[GeneralizedLinearRegr // Save model data: intercept, coefficients val data = Data(instance.intercept, instance.coefficients) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala index f1f2179ac4b3..29d8a00a4384 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala @@ -306,7 +306,7 @@ object IsotonicRegressionModel extends MLReadable[IsotonicRegressionModel] { val data = Data( instance.oldModel.boundaries, instance.oldModel.predictions, instance.oldModel.isotonic) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 23e536ce45eb..d5dce782770b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -784,7 +784,7 @@ private class InternalLinearRegressionModelWriter // Save model data: intercept, coefficients, scale val data = Data(instance.intercept, instance.coefficients, instance.scale) val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(Seq(data)).write.parquet(dataPath) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index 9b26d0a911ac..f869c3d596ca 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -411,7 +411,10 @@ private[ml] object DefaultParamsWriter { paramMap: Option[JValue] = None): Unit = { val metadataPath = new Path(path, "metadata").toString val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap) - sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath) + val spark = SparkSession.getActiveSession.get + // Note that we should write single file. If there are more than one row + // it produces more partitions. + spark.createDataFrame(Seq(Tuple1(metadataJson))).write.text(metadataPath) } /** @@ -585,7 +588,8 @@ private[ml] object DefaultParamsReader { */ def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata = { val metadataPath = new Path(path, "metadata").toString - val metadataStr = sc.textFile(metadataPath, 1).first() + val spark = SparkSession.getActiveSession.get + val metadataStr = spark.read.text(metadataPath).first().getString(0) parseMetadata(metadataStr, expectedClassName) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 3d36b8270861..3bc1d592f989 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -201,7 +201,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) // Create Parquet data. - spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path)) + spark.createDataFrame(Seq(data)).write.parquet(dataPath(path)) } @Since("1.3.0") @@ -246,7 +246,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) // Create Parquet data. - spark.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath(path)) + spark.createDataFrame(Seq(data)).write.parquet(dataPath(path)) } def load(sc: SparkContext, path: String): NaiveBayesModel = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala index 84491181d077..cb18a6003f7f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala @@ -61,7 +61,7 @@ private[classification] object GLMClassificationModel { // Create Parquet data. val data = Data(weights, intercept, threshold) - spark.createDataFrame(Seq(data)).repartition(1).write.parquet(Loader.dataPath(path)) + spark.createDataFrame(Seq(data)).write.parquet(Loader.dataPath(path)) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala index cd90e97cc538..bbc513f93b38 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala @@ -57,7 +57,7 @@ private[regression] object GLMRegressionModel { // Create Parquet data. val data = Data(weights, intercept) - spark.createDataFrame(Seq(data)).repartition(1).write.parquet(Loader.dataPath(path)) + spark.createDataFrame(Seq(data)).write.parquet(Loader.dataPath(path)) } /** From 4fe591bfbd91986c4796264ad52ef18f29fc8047 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 15 Jul 2024 08:54:07 +0900 Subject: [PATCH 2/4] fixup --- .../org/apache/spark/mllib/classification/NaiveBayes.scala | 4 ++-- .../mllib/classification/impl/GLMClassificationModel.scala | 2 +- .../spark/mllib/clustering/BisectingKMeansModel.scala | 6 +++--- .../spark/mllib/clustering/GaussianMixtureModel.scala | 2 +- .../org/apache/spark/mllib/clustering/KMeansModel.scala | 4 ++-- .../scala/org/apache/spark/mllib/clustering/LDAModel.scala | 4 ++-- .../spark/mllib/clustering/PowerIterationClustering.scala | 2 +- .../org/apache/spark/mllib/feature/ChiSqSelector.scala | 2 +- .../scala/org/apache/spark/mllib/feature/Word2Vec.scala | 2 +- .../main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala | 2 +- .../main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- .../mllib/recommendation/MatrixFactorizationModel.scala | 2 +- .../apache/spark/mllib/regression/IsotonicRegression.scala | 2 +- .../spark/mllib/regression/impl/GLMRegressionModel.scala | 2 +- .../apache/spark/mllib/tree/model/DecisionTreeModel.scala | 2 +- .../apache/spark/mllib/tree/model/treeEnsembleModels.scala | 2 +- 16 files changed, 21 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index 3bc1d592f989..e5b162e83c77 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -198,7 +198,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) // Create Parquet data. spark.createDataFrame(Seq(data)).write.parquet(dataPath(path)) @@ -243,7 +243,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) // Create Parquet data. spark.createDataFrame(Seq(data)).write.parquet(dataPath(path)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala index cb18a6003f7f..439682797d03 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala @@ -57,7 +57,7 @@ private[classification] object GLMClassificationModel { val metadata = compact(render( ("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> numFeatures) ~ ("numClasses" -> numClasses))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val data = Data(weights, intercept, threshold) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index 9f3aad923897..083c3e3e77a9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala @@ -179,7 +179,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, @@ -215,7 +215,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, @@ -253,7 +253,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure) ~ ("trainingCost" -> model.trainingCost))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 8982a8ca7c6c..40a810a699ac 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -147,7 +147,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { // Create JSON metadata. val metadata = compact(render (("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("k" -> weights.length))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val dataArray = Array.tabulate(weights.length) { i => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index 476df64581f7..e5c0b27072d0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -172,7 +172,7 @@ object KMeansModel extends Loader[KMeansModel] { val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq) .map { case (p, id) => Cluster(id, p.vector) @@ -207,7 +207,7 @@ object KMeansModel extends Loader[KMeansModel] { ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k) ~ ("distanceMeasure" -> model.distanceMeasure) ~ ("trainingCost" -> model.trainingCost))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq) .map { case (p, id) => Cluster(id, p.vector) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 831c7a9316fd..10a81acede0c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -460,7 +460,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] { ("docConcentration" -> docConcentration.toArray.toImmutableArraySeq) ~ ("topicConcentration" -> topicConcentration) ~ ("gammaShape" -> gammaShape))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val topicsDenseMatrix = topicsMatrix.asBreeze.toDenseMatrix val topics = Range(0, k).map { topicInd => @@ -869,7 +869,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { ("topicConcentration" -> topicConcentration) ~ ("iterationTimes" -> iterationTimes.toImmutableArraySeq) ~ ("gammaShape" -> gammaShape))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString spark.createDataFrame(Seq(Data(Vectors.fromBreeze(globalTopicTotals)))).write.parquet(newPath) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 9150bb305876..639e762ef3c8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -73,7 +73,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) spark.createDataFrame(model.assignments).write.parquet(Loader.dataPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index 41eb6567b845..ed23df70c577 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -136,7 +136,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val dataArray = Array.tabulate(model.selectedFeatures.length) { i => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 499dc09b8621..b5b2233ecb75 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -686,7 +686,7 @@ object Word2VecModel extends Loader[Word2VecModel] { val metadata = compact(render( ("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // We want to partition the model in partitions smaller than // spark.kryoserializer.buffer.max diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index 246146a831f8..fd21e1998ce7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -108,7 +108,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Get the type of item class val sample = model.freqItemsets.first().items(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 3c648f34c610..9c16ac2ecd52 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -655,7 +655,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Get the type of item class val sample = model.freqSequences.first().sequence(0)(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index bc888aecec0a..fb9e8ac7c892 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -387,7 +387,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { import spark.implicits._ val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) model.userFeatures.toDF("id", "features").write.parquet(userPath(path)) model.productFeatures.toDF("id", "features").write.parquet(productPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index e957d8ebd74a..456580ffa531 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -188,7 +188,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("isotonic" -> isotonic))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) spark.createDataFrame( boundaries.toImmutableArraySeq.zip(predictions).map { case (b, p) => Data(b, p) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala index bbc513f93b38..b527797a2f2f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala @@ -53,7 +53,7 @@ private[regression] object GLMRegressionModel { val metadata = compact(render( ("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> weights.size))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val data = Data(weights, intercept) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index 2f65dea0c4a8..416c8ee30153 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -229,7 +229,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("algo" -> model.algo.toString) ~ ("numNodes" -> model.numNodes))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val nodes = model.topNode.subtreeIterator.toSeq diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index aa2287f3af89..7251dfd07a1f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -430,7 +430,7 @@ private[tree] object TreeEnsembleModel extends Logging { val metadata = compact(render( ("class" -> className) ~ ("version" -> thisFormatVersion) ~ ("metadata" -> Extraction.decompose(ensembleMetadata)))) - sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) // Create Parquet data. val dataRDD = sc.parallelize(model.trees.zipWithIndex.toImmutableArraySeq) From da168c3697a60ce97dec92ece7dbe305d097a0b4 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 15 Jul 2024 09:11:27 +0900 Subject: [PATCH 3/4] consistency --- mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala | 2 +- .../org/apache/spark/mllib/tree/model/DecisionTreeModel.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index f869c3d596ca..021595f76c24 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -411,7 +411,7 @@ private[ml] object DefaultParamsWriter { paramMap: Option[JValue] = None): Unit = { val metadataPath = new Path(path, "metadata").toString val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap) - val spark = SparkSession.getActiveSession.get + val spark = SparkSession.builder().sparkContext(sc).getOrCreate() // Note that we should write single file. If there are more than one row // it produces more partitions. spark.createDataFrame(Seq(Tuple1(metadataJson))).write.text(metadataPath) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index 416c8ee30153..b45211c1689c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -226,6 +226,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { } // Create JSON metadata. + val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("algo" -> model.algo.toString) ~ ("numNodes" -> model.numNodes))) @@ -234,7 +235,6 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { // Create Parquet data. val nodes = model.topNode.subtreeIterator.toSeq val dataRDD = sc.parallelize(nodes).map(NodeData.apply(0, _)) - val spark = SparkSession.builder().sparkContext(sc).getOrCreate() spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path)) } From 83494530787d48da5b4792926147b4425b011cdf Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 16 Jul 2024 19:02:08 +0900 Subject: [PATCH 4/4] separate the PR --- .../main/scala/org/apache/spark/ml/util/ReadWrite.scala | 8 ++------ .../apache/spark/mllib/classification/NaiveBayes.scala | 4 ++-- .../classification/impl/GLMClassificationModel.scala | 2 +- .../spark/mllib/clustering/BisectingKMeansModel.scala | 6 +++--- .../spark/mllib/clustering/GaussianMixtureModel.scala | 2 +- .../org/apache/spark/mllib/clustering/KMeansModel.scala | 4 ++-- .../org/apache/spark/mllib/clustering/LDAModel.scala | 4 ++-- .../spark/mllib/clustering/PowerIterationClustering.scala | 2 +- .../org/apache/spark/mllib/feature/ChiSqSelector.scala | 2 +- .../scala/org/apache/spark/mllib/feature/Word2Vec.scala | 2 +- .../main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala | 2 +- .../scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- .../mllib/recommendation/MatrixFactorizationModel.scala | 2 +- .../spark/mllib/regression/IsotonicRegression.scala | 2 +- .../spark/mllib/regression/impl/GLMRegressionModel.scala | 2 +- .../apache/spark/mllib/tree/model/DecisionTreeModel.scala | 4 ++-- .../spark/mllib/tree/model/treeEnsembleModels.scala | 2 +- 17 files changed, 24 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala index 021595f76c24..9b26d0a911ac 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala @@ -411,10 +411,7 @@ private[ml] object DefaultParamsWriter { paramMap: Option[JValue] = None): Unit = { val metadataPath = new Path(path, "metadata").toString val metadataJson = getMetadataToSave(instance, sc, extraMetadata, paramMap) - val spark = SparkSession.builder().sparkContext(sc).getOrCreate() - // Note that we should write single file. If there are more than one row - // it produces more partitions. - spark.createDataFrame(Seq(Tuple1(metadataJson))).write.text(metadataPath) + sc.parallelize(Seq(metadataJson), 1).saveAsTextFile(metadataPath) } /** @@ -588,8 +585,7 @@ private[ml] object DefaultParamsReader { */ def loadMetadata(path: String, sc: SparkContext, expectedClassName: String = ""): Metadata = { val metadataPath = new Path(path, "metadata").toString - val spark = SparkSession.getActiveSession.get - val metadataStr = spark.read.text(metadataPath).first().getString(0) + val metadataStr = sc.textFile(metadataPath, 1).first() parseMetadata(metadataStr, expectedClassName) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala index e5b162e83c77..3bc1d592f989 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala @@ -198,7 +198,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) // Create Parquet data. spark.createDataFrame(Seq(data)).write.parquet(dataPath(path)) @@ -243,7 +243,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> data.theta(0).length) ~ ("numClasses" -> data.pi.length))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) // Create Parquet data. spark.createDataFrame(Seq(data)).write.parquet(dataPath(path)) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala index 439682797d03..cb18a6003f7f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/impl/GLMClassificationModel.scala @@ -57,7 +57,7 @@ private[classification] object GLMClassificationModel { val metadata = compact(render( ("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> numFeatures) ~ ("numClasses" -> numClasses))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) // Create Parquet data. val data = Data(weights, intercept, threshold) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index 083c3e3e77a9..9f3aad923897 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala @@ -179,7 +179,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, @@ -215,7 +215,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, @@ -253,7 +253,7 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure) ~ ("trainingCost" -> model.trainingCost))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val data = getNodes(model.root).map(node => Data(node.index, node.size, node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala index 40a810a699ac..8982a8ca7c6c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala @@ -147,7 +147,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] { // Create JSON metadata. val metadata = compact(render (("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("k" -> weights.length))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) // Create Parquet data. val dataArray = Array.tabulate(weights.length) { i => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala index e5c0b27072d0..476df64581f7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala @@ -172,7 +172,7 @@ object KMeansModel extends Loader[KMeansModel] { val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq) .map { case (p, id) => Cluster(id, p.vector) @@ -207,7 +207,7 @@ object KMeansModel extends Loader[KMeansModel] { ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k) ~ ("distanceMeasure" -> model.distanceMeasure) ~ ("trainingCost" -> model.trainingCost))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val dataRDD = sc.parallelize(model.clusterCentersWithNorm.zipWithIndex.toImmutableArraySeq) .map { case (p, id) => Cluster(id, p.vector) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala index 10a81acede0c..831c7a9316fd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala @@ -460,7 +460,7 @@ object LocalLDAModel extends Loader[LocalLDAModel] { ("docConcentration" -> docConcentration.toArray.toImmutableArraySeq) ~ ("topicConcentration" -> topicConcentration) ~ ("gammaShape" -> gammaShape))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val topicsDenseMatrix = topicsMatrix.asBreeze.toDenseMatrix val topics = Range(0, k).map { topicInd => @@ -869,7 +869,7 @@ object DistributedLDAModel extends Loader[DistributedLDAModel] { ("topicConcentration" -> topicConcentration) ~ ("iterationTimes" -> iterationTimes.toImmutableArraySeq) ~ ("gammaShape" -> gammaShape))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) val newPath = new Path(Loader.dataPath(path), "globalTopicTotals").toUri.toString spark.createDataFrame(Seq(Data(Vectors.fromBreeze(globalTopicTotals)))).write.parquet(newPath) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala index 639e762ef3c8..9150bb305876 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala @@ -73,7 +73,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("k" -> model.k))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) spark.createDataFrame(model.assignments).write.parquet(Loader.dataPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala index ed23df70c577..41eb6567b845 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala @@ -136,7 +136,7 @@ object ChiSqSelectorModel extends Loader[ChiSqSelectorModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) // Create Parquet data. val dataArray = Array.tabulate(model.selectedFeatures.length) { i => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index b5b2233ecb75..499dc09b8621 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -686,7 +686,7 @@ object Word2VecModel extends Loader[Word2VecModel] { val metadata = compact(render( ("class" -> classNameV1_0) ~ ("version" -> formatVersionV1_0) ~ ("vectorSize" -> vectorSize) ~ ("numWords" -> numWords))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) // We want to partition the model in partitions smaller than // spark.kryoserializer.buffer.max diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index fd21e1998ce7..246146a831f8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -108,7 +108,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) // Get the type of item class val sample = model.freqItemsets.first().items(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 9c16ac2ecd52..3c648f34c610 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -655,7 +655,7 @@ object PrefixSpanModel extends Loader[PrefixSpanModel[_]] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) // Get the type of item class val sample = model.freqSequences.first().sequence(0)(0) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index fb9e8ac7c892..bc888aecec0a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -387,7 +387,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { import spark.implicits._ val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) model.userFeatures.toDF("id", "features").write.parquet(userPath(path)) model.productFeatures.toDF("id", "features").write.parquet(productPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala index 456580ffa531..e957d8ebd74a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala @@ -188,7 +188,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] { val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("isotonic" -> isotonic))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path)) spark.createDataFrame( boundaries.toImmutableArraySeq.zip(predictions).map { case (b, p) => Data(b, p) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala index b527797a2f2f..bbc513f93b38 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/impl/GLMRegressionModel.scala @@ -53,7 +53,7 @@ private[regression] object GLMRegressionModel { val metadata = compact(render( ("class" -> modelClass) ~ ("version" -> thisFormatVersion) ~ ("numFeatures" -> weights.size))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) // Create Parquet data. val data = Data(weights, intercept) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala index b45211c1689c..2f65dea0c4a8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala @@ -226,15 +226,15 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging { } // Create JSON metadata. - val spark = SparkSession.builder().sparkContext(sc).getOrCreate() val metadata = compact(render( ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("algo" -> model.algo.toString) ~ ("numNodes" -> model.numNodes))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) // Create Parquet data. val nodes = model.topNode.subtreeIterator.toSeq val dataRDD = sc.parallelize(nodes).map(NodeData.apply(0, _)) + val spark = SparkSession.builder().sparkContext(sc).getOrCreate() spark.createDataFrame(dataRDD).write.parquet(Loader.dataPath(path)) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index 7251dfd07a1f..aa2287f3af89 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -430,7 +430,7 @@ private[tree] object TreeEnsembleModel extends Logging { val metadata = compact(render( ("class" -> className) ~ ("version" -> thisFormatVersion) ~ ("metadata" -> Extraction.decompose(ensembleMetadata)))) - spark.createDataFrame(Seq(Tuple1(metadata))).write.text(Loader.metadataPath(path)) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) // Create Parquet data. val dataRDD = sc.parallelize(model.trees.zipWithIndex.toImmutableArraySeq)