From aeb62ee102c0dd2f55450c010d3b375a84811469 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 20 Jan 2017 13:12:29 +0800 Subject: [PATCH 1/4] create pr --- .../spark/ml/clustering/BisectingKMeans.scala | 19 ++++++- .../spark/ml/clustering/GaussianMixture.scala | 47 +++++++++++++++++ .../apache/spark/ml/clustering/KMeans.scala | 19 ++++++- .../ml/clustering/BisectingKMeansSuite.scala | 8 +++ .../ml/clustering/GaussianMixtureSuite.scala | 8 +++ .../spark/ml/clustering/KMeansSuite.scala | 8 +++ project/MimaExcludes.scala | 6 ++- python/pyspark/ml/clustering.py | 50 ++++++++++++++++++- 8 files changed, 158 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index 4c20e6563bad..66a51e5a6e1f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -157,6 +157,17 @@ class BisectingKMeansModel private[ml] ( throw new SparkException( s"No training summary available for the ${this.getClass.getSimpleName}") } + + /** + * Evaluates the model on a test dataset. + * + * @param dataset Test dataset to evaluate model on. + */ + @Since("2.2.0") + def evaluate(dataset: Dataset[_]): BisectingKMeansSummary = { + val wssse = computeCost(dataset) + new BisectingKMeansSummary(transform(dataset), $(predictionCol), $(featuresCol), $(k), wssse) + } } object BisectingKMeansModel extends MLReadable[BisectingKMeansModel] { @@ -265,8 +276,9 @@ class BisectingKMeans @Since("2.0.0") ( .setSeed($(seed)) val parentModel = bkm.run(rdd) val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this)) + val wssse = model.computeCost(dataset) val summary = new BisectingKMeansSummary( - model.transform(dataset), $(predictionCol), $(featuresCol), $(k)) + model.transform(dataset), $(predictionCol), $(featuresCol), $(k), wssse) model.setSummary(Some(summary)) instr.logSuccess(model) model @@ -295,6 +307,7 @@ object BisectingKMeans extends DefaultParamsReadable[BisectingKMeans] { * @param predictionCol Name for column of predicted clusters in `predictions`. * @param featuresCol Name for column of features in `predictions`. * @param k Number of clusters. + * @param wssse Within Set Sum of Squared Error. */ @Since("2.1.0") @Experimental @@ -302,4 +315,6 @@ class BisectingKMeansSummary private[clustering] ( predictions: DataFrame, predictionCol: String, featuresCol: String, - k: Int) extends ClusteringSummary(predictions, predictionCol, featuresCol, k) + k: Int, + @Since("2.2.0") val wssse: Double) + extends ClusteringSummary(predictions, predictionCol, featuresCol, k) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index db5fff5af86e..6c234a16f963 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -180,6 +180,53 @@ class GaussianMixtureModel private[ml] ( throw new RuntimeException( s"No training summary available for the ${this.getClass.getSimpleName}") } + + /** + * Return the total log-likelihood for this model on the given data. + */ + private[clustering] def computeLogLikelihood(dataset: Dataset[_]): Double = { + SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) + val spark = dataset.sparkSession + import spark.implicits._ + + val bcWeightAndDists = spark.sparkContext.broadcast(weights.zip(gaussians)) + dataset.select(col($(featuresCol))).map { + case Row(feature: Vector) => + val likelihood = bcWeightAndDists.value.map { + case (weight, dist) => EPSILON + weight * dist.pdf(feature) + }.sum + math.log(likelihood) + }.reduce(_ + _) + } + + /** + * If the probability column is set returns the current model and probability column, + * otherwise generates a new column and sets it as the probability column on a new copy + * of the current model. + */ + private[clustering] def findSummaryModelAndProbabilityCol(): + (GaussianMixtureModel, String) = { + $(probabilityCol) match { + case "" => + val probabilityColName = "probability_" + java.util.UUID.randomUUID.toString + (copy(ParamMap.empty).setProbabilityCol(probabilityColName), probabilityColName) + case p => (this, p) + } + } + + /** + * Evaluates the model on a test dataset. + * + * @param dataset Test dataset to evaluate model on. + */ + @Since("2.2.0") + def evaluate(dataset: Dataset[_]): GaussianMixtureSummary = { + // Handle possible missing or invalid prediction columns + val (summaryModel, probabilityColName) = findSummaryModelAndProbabilityCol() + val loglikelihood = computeLogLikelihood(dataset) + new GaussianMixtureSummary(summaryModel.transform(dataset), $(predictionCol), + probabilityColName, $(featuresCol), $(k), loglikelihood) + } } @Since("2.0.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index e02b532ca8a9..05d349c9754c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -183,6 +183,17 @@ class KMeansModel private[ml] ( throw new SparkException( s"No training summary available for the ${this.getClass.getSimpleName}") } + + /** + * Evaluates the model on a test dataset. + * + * @param dataset Test dataset to evaluate model on. + */ + @Since("2.2.0") + def evaluate(dataset: Dataset[_]): KMeansSummary = { + val wssse = computeCost(dataset) + new KMeansSummary(transform(dataset), $(predictionCol), $(featuresCol), $(k), wssse) + } } @Since("1.6.0") @@ -324,8 +335,9 @@ class KMeans @Since("1.5.0") ( .setEpsilon($(tol)) val parentModel = algo.run(instances, Option(instr)) val model = copyValues(new KMeansModel(uid, parentModel).setParent(this)) + val wssse = model.computeCost(dataset) val summary = new KMeansSummary( - model.transform(dataset), $(predictionCol), $(featuresCol), $(k)) + model.transform(dataset), $(predictionCol), $(featuresCol), $(k), wssse) model.setSummary(Some(summary)) instr.logSuccess(model) @@ -356,6 +368,7 @@ object KMeans extends DefaultParamsReadable[KMeans] { * @param predictionCol Name for column of predicted clusters in `predictions`. * @param featuresCol Name for column of features in `predictions`. * @param k Number of clusters. + * @param wssse Within Set Sum of Squared Error. */ @Since("2.0.0") @Experimental @@ -363,4 +376,6 @@ class KMeansSummary private[clustering] ( predictions: DataFrame, predictionCol: String, featuresCol: String, - k: Int) extends ClusteringSummary(predictions, predictionCol, featuresCol, k) + k: Int, + @Since("2.2.0") val wssse: Double) + extends ClusteringSummary(predictions, predictionCol, featuresCol, k) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index fc491cd6161f..f49b1ed049ce 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -122,6 +122,14 @@ class BisectingKMeansSuite testEstimatorAndModelReadWrite( bisectingKMeans, dataset, BisectingKMeansSuite.allParamSettings, checkModelData) } + + test("evaluate on test set") { + val bkm = new BisectingKMeans().setK(k).setSeed(1) + val model = bkm.fit(dataset) + val summary = model.summary + val sameSummary = model.evaluate(dataset) + assert(summary.wssse === sameSummary.wssse) + } } object BisectingKMeansSuite { diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index e54eb2750c38..9ac36c342dee 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -243,6 +243,14 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext val expectedMatrix = GaussianMixture.unpackUpperTriangularMatrix(4, triangularValues) assert(symmetricMatrix === expectedMatrix) } + + test("evaluate on test set") { + val gm = new GaussianMixture().setK(k).setMaxIter(2).setSeed(1) + val model = gm.fit(dataset) + val summary = model.summary + val sameSummary = model.evaluate(dataset) + assert(summary.logLikelihood === sameSummary.logLikelihood) + } } object GaussianMixtureSuite extends SparkFunSuite { diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index c1b7242e11a8..26a1250c4e80 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -150,6 +150,14 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR val kmeans = new KMeans() testEstimatorAndModelReadWrite(kmeans, dataset, KMeansSuite.allParamSettings, checkModelData) } + + test("evaluate on test set") { + val kmeans = new KMeans().setK(k).setSeed(1) + val model = kmeans.fit(dataset) + val summary = model.summary + val sameSummary = model.evaluate(dataset) + assert(summary.wssse === sameSummary.wssse) + } } object KMeansSuite { diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 7e6e14352338..77b7af02f1b4 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -54,7 +54,11 @@ object MimaExcludes { // [SPARK-19069] [CORE] Expose task 'status' and 'duration' in spark history server REST API. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.$default$10"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.$default$11") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.$default$11"), + + // [SPARK-19303][ML] Add evaluate method in clustering models + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.KMeansSummary.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.clustering.BisectingKMeansSummary.this") ) // Exclude rules for 2.1.x diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index c6c1a0033190..6bc917c0ba63 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -134,6 +134,13 @@ def summary(self): raise RuntimeError("No training summary available for this %s" % self.__class__.__name__) + @since("2.2.0") + def evaluate(self, dataset): + """ + Evaluates the model on a test dataset. + """ + return self._call_java("evaluate", dataset) + @inherit_doc class GaussianMixture(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol, HasSeed, @@ -177,6 +184,9 @@ class GaussianMixture(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIte [2, 2, 2] >>> summary.logLikelihood 8.14636... + >>> same_summary = model.evaluate(df) + >>> summary.logLikelihood == same_summary.logLikelihood + True >>> weights = model.weights >>> len(weights) 3 @@ -300,7 +310,13 @@ class KMeansSummary(ClusteringSummary): .. versionadded:: 2.1.0 """ - pass + @property + @since("2.2.0") + def wssse(self): + """ + Within Set Sum of Squared Error. + """ + return self._call_java("wssse") class KMeansModel(JavaModel, JavaMLWritable, JavaMLReadable): @@ -344,6 +360,13 @@ def summary(self): raise RuntimeError("No training summary available for this %s" % self.__class__.__name__) + @since("2.2.0") + def evaluate(self, dataset): + """ + Evaluates the model on a test dataset. + """ + return self._call_java("evaluate", dataset) + @inherit_doc class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol, HasSeed, @@ -376,6 +399,11 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol 2 >>> summary.clusterSizes [2, 2] + >>> summary.wssse + 8.14636... + >>> same_summary = model.evaluate(df) + >>> summary.wssse == same_summary.wssse + True >>> kmeans_path = temp_path + "/kmeans" >>> kmeans.save(kmeans_path) >>> kmeans2 = KMeans.load(kmeans_path) @@ -517,6 +545,13 @@ def summary(self): raise RuntimeError("No training summary available for this %s" % self.__class__.__name__) + @since("2.2.0") + def evaluate(self, dataset): + """ + Evaluates the model on a test dataset. + """ + return self._call_java("evaluate", dataset) + @inherit_doc class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasSeed, @@ -549,6 +584,11 @@ class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIte 2 >>> summary.clusterSizes [2, 2] + >>> summary.wssse + 8.14636... + >>> same_summary = model.evaluate(df) + >>> summary.wssse == same_summary.wssse + True >>> transformed = model.transform(df).select("features", "prediction") >>> rows = transformed.collect() >>> rows[0].prediction == rows[1].prediction @@ -646,7 +686,13 @@ class BisectingKMeansSummary(ClusteringSummary): .. versionadded:: 2.1.0 """ - pass + @property + @since("2.2.0") + def wssse(self): + """ + Within Set Sum of Squared Error. + """ + return self._call_java("wssse") @inherit_doc From 7f9364a7360691a9403874f2a5fba5fbb8f70cab Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 20 Jan 2017 15:21:47 +0800 Subject: [PATCH 2/4] update tol --- .../org/apache/spark/ml/clustering/GaussianMixtureSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala index 9ac36c342dee..ebb5c44b37a4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/GaussianMixtureSuite.scala @@ -249,7 +249,7 @@ class GaussianMixtureSuite extends SparkFunSuite with MLlibTestSparkContext val model = gm.fit(dataset) val summary = model.summary val sameSummary = model.evaluate(dataset) - assert(summary.logLikelihood === sameSummary.logLikelihood) + assert(summary.logLikelihood ~== sameSummary.logLikelihood absTol 2) } } From 01bffc132d5296755264d6905411fdcc3ae29504 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 20 Jan 2017 18:39:43 +0800 Subject: [PATCH 3/4] update test --- python/pyspark/ml/clustering.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 6bc917c0ba63..d4eefebef195 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -185,7 +185,7 @@ class GaussianMixture(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIte >>> summary.logLikelihood 8.14636... >>> same_summary = model.evaluate(df) - >>> summary.logLikelihood == same_summary.logLikelihood + >>> abs(summary.logLikelihood - same_summary.logLikelihood) < 1e-3 True >>> weights = model.weights >>> len(weights) @@ -400,9 +400,9 @@ class KMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIter, HasTol >>> summary.clusterSizes [2, 2] >>> summary.wssse - 8.14636... + 2.000... >>> same_summary = model.evaluate(df) - >>> summary.wssse == same_summary.wssse + >>> abs(summary.wssse - same_summary.wssse) < 1e-3 True >>> kmeans_path = temp_path + "/kmeans" >>> kmeans.save(kmeans_path) @@ -585,9 +585,9 @@ class BisectingKMeans(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasMaxIte >>> summary.clusterSizes [2, 2] >>> summary.wssse - 8.14636... + 2.000... >>> same_summary = model.evaluate(df) - >>> summary.wssse == same_summary.wssse + >>> abs(summary.wssse - same_summary.wssse) < 1e-3 True >>> transformed = model.transform(df).select("features", "prediction") >>> rows = transformed.collect() From 5937ce703df857b109982f49bca96b9c3c325587 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Sun, 22 Jan 2017 14:18:53 +0800 Subject: [PATCH 4/4] add py summary wrapper --- python/pyspark/ml/clustering.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index d4eefebef195..ddf600cff96b 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -139,7 +139,7 @@ def evaluate(self, dataset): """ Evaluates the model on a test dataset. """ - return self._call_java("evaluate", dataset) + return GaussianMixtureSummary(self._call_java("evaluate", dataset)) @inherit_doc @@ -365,7 +365,7 @@ def evaluate(self, dataset): """ Evaluates the model on a test dataset. """ - return self._call_java("evaluate", dataset) + return KMeansSummary(self._call_java("evaluate", dataset)) @inherit_doc @@ -550,7 +550,7 @@ def evaluate(self, dataset): """ Evaluates the model on a test dataset. """ - return self._call_java("evaluate", dataset) + return BisectingKMeansSummary(self._call_java("evaluate", dataset)) @inherit_doc