From cdc97c103ea3f3da60e5e93482a7994f94e790c8 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 8 Jun 2018 12:51:43 -0700 Subject: [PATCH 1/5] [SPARK-19826][ML][PYTHON]add spark.ml Python API for PIC --- python/pyspark/ml/clustering.py | 223 +++++++++++++++++++++++++++++++- 1 file changed, 218 insertions(+), 5 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index b3d5fb17f6b81..532fb100ccdd7 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -19,14 +19,17 @@ from pyspark import since, keyword_only from pyspark.ml.util import * -from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper +from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaWrapper from pyspark.ml.param.shared import * from pyspark.ml.common import inherit_doc +from pyspark.sql.functions import col, lit +from pyspark.sql.types import DoubleType, LongType +from pyspark.mllib.clustering import PowerIterationClustering as MLlibPowerIterationClustering __all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary', 'KMeans', 'KMeansModel', 'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary', - 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel'] + 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering'] class ClusteringSummary(JavaWrapper): @@ -836,7 +839,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter Terminology: - - "term" = "word": an el + - "term" = "word": an element of the vocabulary - "token": instance of a term appearing in a document - "topic": multinomial distribution over terms representing some concept - "document": one piece of text, corresponding to one row in the input data @@ -938,7 +941,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\ subsamplingRate=0.05, optimizeDocConcentration=True,\ docConcentration=None, topicConcentration=None,\ - topicDistributionCol="topicDistribution", keepLastCheckpoint=True): + topicDistributionCol="topicDistribution", keepLastCheckpoint=True) """ super(LDA, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid) @@ -967,7 +970,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\ subsamplingRate=0.05, optimizeDocConcentration=True,\ docConcentration=None, topicConcentration=None,\ - topicDistributionCol="topicDistribution", keepLastCheckpoint=True): + topicDistributionCol="topicDistribution", keepLastCheckpoint=True) Sets params for LDA. """ @@ -1156,6 +1159,216 @@ def getKeepLastCheckpoint(self): return self.getOrDefault(self.keepLastCheckpoint) +@inherit_doc +class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReadable, + JavaMLWritable): + """ + .. note:: Experimental + + Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by + Lin and Cohen. From the abstract: + PIC finds a very low-dimensional embedding of a dataset using truncated power + iteration on a normalized pair-wise similarity matrix of the data. + + This class is not yet an Estimator/Transformer, use `assignClusters` method to run the + PowerIterationClustering algorithm. + + .. seealso:: `Wikipedia on Spectral clustering \ + `_ + + >>> from pyspark.sql.types import StructField, StructType + >>> import math + >>> def genCircle(r, n): + ... points = [] + ... for i in range(0, n): + ... theta = 2.0 * math.pi * i / n + ... points.append((r * math.cos(theta), r * math.sin(theta))) + ... return points + >>> def sim(x, y): + ... dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) + ... return math.exp(-dist / 2.0) + >>> r1 = 1.0 + >>> n1 = 10 + >>> r2 = 4.0 + >>> n2 = 40 + >>> n = n1 + n2 + >>> points = genCircle(r1, n1) + genCircle(r2, n2) + >>> data = [(i, j, sim(points[i], points[j])) for i in range(1, n) for j in range(0, i)] + >>> rdd = sc.parallelize(data, 2) + >>> schema = StructType([StructField("src", LongType(), False), \ + StructField("dst", LongType(), True), \ + StructField("weight", DoubleType(), True)]) + >>> df = spark.createDataFrame(rdd, schema) + >>> pic = PowerIterationClustering() + >>> assignments = pic.setK(2).setMaxIter(40).setWeightCol("weight").assignClusters(df) + >>> result = sorted(assignments.collect(), key=lambda x: x.id) + >>> result[0].cluster == result[1].cluster == result[2].cluster == result[3].cluster + True + >>> result[4].cluster == result[5].cluster == result[6].cluster == result[7].cluster + True + >>> pic_path = temp_path + "/pic" + >>> pic.save(pic_path) + >>> pic2 = PowerIterationClustering.load(pic_path) + >>> pic2.getK() + 2 + >>> pic2.getMaxIter() + 40 + >>> assignments2 = pic2.assignClusters(df) + >>> result2 = sorted(assignments2.collect(), key=lambda x: x.id) + >>> result2[0].cluster == result2[1].cluster == result2[2].cluster == result2[3].cluster + True + >>> result2[4].cluster == result2[5].cluster == result2[6].cluster == result2[7].cluster + True + >>> pic3 = PowerIterationClustering(k=4, initMode="degree", srcCol="source", dstCol="dest") + >>> pic3.getSrcCol() + 'source' + >>> pic3.getDstCol() + 'dest' + >>> pic3.getK() + 4 + >>> pic3.getMaxIter() + 20 + >>> pic3.getInitMode() + 'degree' + + .. versionadded:: 2.4.0 + """ + + k = Param(Params._dummy(), "k", + "The number of clusters to create. Must be > 1.", + typeConverter=TypeConverters.toInt) + initMode = Param(Params._dummy(), "initMode", + "The initialization algorithm. This can be either " + + "'random' to use a random vector as vertex properties, or 'degree' to use " + + "a normalized sum of similarities with other vertices. Supported options: " + + "'random' and 'degree'.", + typeConverter=TypeConverters.toString) + srcCol = Param(Params._dummy(), "srcCol", + "Name of the input column for source vertex IDs.", + typeConverter=TypeConverters.toString) + dstCol = Param(Params._dummy(), "dstCol", + "Name of the input column for destination vertex IDs.", + typeConverter=TypeConverters.toString) + + @keyword_only + def __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst", + weightCol=None): + """ + __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\ + weightCol=None) + """ + super(PowerIterationClustering, self).__init__() + self._java_obj = self._new_java_obj( + "org.apache.spark.ml.clustering.PowerIterationClustering", self.uid) + self._setDefault(k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst") + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.4.0") + def setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst", + weightCol=None): + """ + setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\ + weightCol=None) + Sets params for PowerIterationClustering. + """ + kwargs = self._input_kwargs + return self._set(**kwargs) + + @since("2.4.0") + def setK(self, value): + """ + Sets the value of :py:attr:`k`. + """ + return self._set(k=value) + + @since("2.4.0") + def getK(self): + """ + Gets the value of :py:attr:`k`. + """ + return self.getOrDefault(self.k) + + @since("2.4.0") + def setInitMode(self, value): + """ + Sets the value of :py:attr:`initMode`. + """ + return self._set(initMode=value) + + @since("2.4.0") + def getInitMode(self): + """ + Gets the value of `initMode` + """ + return self.getOrDefault(self.initMode) + + @since("2.4.0") + def setSrcCol(self, value): + """ + Sets the value of :py:attr:`srcCol`. + """ + return self._set(srcCol=value) + + @since("2.4.0") + def getSrcCol(self): + """ + Gets the value of :py:attr:`srcCol`. + """ + return self.getOrDefault(self.srcCol) + + @since("2.4.0") + def setDstCol(self, value): + """ + Sets the value of :py:attr:`dstCol`. + """ + return self._set(dstCol=value) + + @since("2.4.0") + def getDstCol(self): + """ + Gets the value of :py:attr:`dstCol`. + """ + return self.getOrDefault(self.dstCol) + + @since("2.4.0") + def assignClusters(self, dataset): + """ + Run the PIC algorithm and returns a cluster assignment for each input vertex. + + :param dataset: + A dataset with columns src, dst, weight representing the affinity matrix, + which is the matrix A in the PIC paper. Suppose the src column value is i, + the dst column value is j, the weight column value is similarity s,,ij,, + which must be nonnegative. This is a symmetric matrix and hence + s,,ij,, = s,,ji,,. For any (i, j) with nonzero similarity, there should be + either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows with i = j are + ignored, because we assume s,,ij,, = 0.0. + + :return: A dataset that contains columns of vertex id and the corresponding cluster for + the id. The schema of it will be: + - id: Long + - cluster: Int + """ + weightCol = None + w = None + if (self.isDefined(self.weightCol)): + weightCol = self.getWeightCol() + if (weightCol is None or len(weightCol) == 0): + w = lit(1.0) + else: + w = col(weightCol).cast(DoubleType()) + srcCol = self.getSrcCol() + dstCol = self.getDstCol() + df = dataset.select(col(srcCol).cast(LongType()), col(dstCol).cast(LongType()), w) + data = df.rdd.map(lambda x: (x[0], x[1], x[2])) + algorithm = MLlibPowerIterationClustering() + model = MLlibPowerIterationClustering.train(data, self.getK(), self.getMaxIter(), + self.getInitMode()) + return model.assignments().toDF() + + if __name__ == "__main__": import doctest import pyspark.ml.clustering From 2d3c760607388facacf11974d3c3615bb6db0c41 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 8 Jun 2018 14:52:58 -0700 Subject: [PATCH 2/5] call ml.clustering.PowerIterationClustering.assignClusters --- python/pyspark/ml/clustering.py | 31 ++++++++----------------------- 1 file changed, 8 insertions(+), 23 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 532fb100ccdd7..41e1ed1025ad6 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -22,9 +22,7 @@ from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaWrapper from pyspark.ml.param.shared import * from pyspark.ml.common import inherit_doc -from pyspark.sql.functions import col, lit -from pyspark.sql.types import DoubleType, LongType -from pyspark.mllib.clustering import PowerIterationClustering as MLlibPowerIterationClustering +from pyspark.sql import DataFrame __all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary', 'KMeans', 'KMeansModel', @@ -1176,7 +1174,7 @@ class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReada .. seealso:: `Wikipedia on Spectral clustering \ `_ - >>> from pyspark.sql.types import StructField, StructType + >>> from pyspark.sql.types import DoubleType, LongType, StructField, StructType >>> import math >>> def genCircle(r, n): ... points = [] @@ -1348,25 +1346,12 @@ def assignClusters(self, dataset): :return: A dataset that contains columns of vertex id and the corresponding cluster for the id. The schema of it will be: - - id: Long - - cluster: Int - """ - weightCol = None - w = None - if (self.isDefined(self.weightCol)): - weightCol = self.getWeightCol() - if (weightCol is None or len(weightCol) == 0): - w = lit(1.0) - else: - w = col(weightCol).cast(DoubleType()) - srcCol = self.getSrcCol() - dstCol = self.getDstCol() - df = dataset.select(col(srcCol).cast(LongType()), col(dstCol).cast(LongType()), w) - data = df.rdd.map(lambda x: (x[0], x[1], x[2])) - algorithm = MLlibPowerIterationClustering() - model = MLlibPowerIterationClustering.train(data, self.getK(), self.getMaxIter(), - self.getInitMode()) - return model.assignments().toDF() + - id: Long + - cluster: Int + """ + self._transfer_params_to_java() + jdf = self._java_obj.assignClusters(dataset._jdf) + return DataFrame(jdf, dataset.sql_ctx) if __name__ == "__main__": From 6a35dde8592a476aa7c5334ee0cf1494fc5bc60a Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Fri, 8 Jun 2018 15:39:05 -0700 Subject: [PATCH 3/5] fix python style issue --- python/pyspark/ml/clustering.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 41e1ed1025ad6..0565a8addad6b 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1344,10 +1344,11 @@ def assignClusters(self, dataset): either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows with i = j are ignored, because we assume s,,ij,, = 0.0. - :return: A dataset that contains columns of vertex id and the corresponding cluster for - the id. The schema of it will be: - - id: Long - - cluster: Int + :return: + A dataset that contains columns of vertex id and the corresponding cluster for + the id. The schema of it will be: + - id: Long + - cluster: Int """ self._transfer_params_to_java() jdf = self._java_obj.assignClusters(dataset._jdf) From fcb9a51e0cb07d0c77c98d93797b21a1b670aa41 Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sat, 9 Jun 2018 12:46:24 -0700 Subject: [PATCH 4/5] address comments --- python/pyspark/ml/clustering.py | 87 ++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 0565a8addad6b..412e503dbccf9 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1168,42 +1168,42 @@ class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReada PIC finds a very low-dimensional embedding of a dataset using truncated power iteration on a normalized pair-wise similarity matrix of the data. - This class is not yet an Estimator/Transformer, use `assignClusters` method to run the - PowerIterationClustering algorithm. + This class is not yet an Estimator/Transformer, use :py:func:`assignClusters` method + to run the PowerIterationClustering algorithm. .. seealso:: `Wikipedia on Spectral clustering \ `_ - >>> from pyspark.sql.types import DoubleType, LongType, StructField, StructType - >>> import math - >>> def genCircle(r, n): - ... points = [] - ... for i in range(0, n): - ... theta = 2.0 * math.pi * i / n - ... points.append((r * math.cos(theta), r * math.sin(theta))) - ... return points - >>> def sim(x, y): - ... dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1]) - ... return math.exp(-dist / 2.0) - >>> r1 = 1.0 - >>> n1 = 10 - >>> r2 = 4.0 - >>> n2 = 40 - >>> n = n1 + n2 - >>> points = genCircle(r1, n1) + genCircle(r2, n2) - >>> data = [(i, j, sim(points[i], points[j])) for i in range(1, n) for j in range(0, i)] - >>> rdd = sc.parallelize(data, 2) - >>> schema = StructType([StructField("src", LongType(), False), \ - StructField("dst", LongType(), True), \ - StructField("weight", DoubleType(), True)]) - >>> df = spark.createDataFrame(rdd, schema) + >>> data = [((long)(1), (long)(0), 0.5), \ + ((long)(2), (long)(0), 0.5), \ + ((long)(2), (long)(1), 0.7), \ + ((long)(3), (long)(0), 0.5), \ + ((long)(3), (long)(1), 0.7), \ + ((long)(3), (long)(2), 0.9), \ + ((long)(4), (long)(0), 0.5), \ + ((long)(4), (long)(1), 0.7), \ + ((long)(4), (long)(2), 0.9), \ + ((long)(4), (long)(3), 1.1), \ + ((long)(5), (long)(0), 0.5), \ + ((long)(5), (long)(1), 0.7), \ + ((long)(5), (long)(2), 0.9), \ + ((long)(5), (long)(3), 1.1), \ + ((long)(5), (long)(4), 1.3)] + >>> df = spark.createDataFrame(data).toDF("src", "dst", "weight") >>> pic = PowerIterationClustering() >>> assignments = pic.setK(2).setMaxIter(40).setWeightCol("weight").assignClusters(df) - >>> result = sorted(assignments.collect(), key=lambda x: x.id) - >>> result[0].cluster == result[1].cluster == result[2].cluster == result[3].cluster - True - >>> result[4].cluster == result[5].cluster == result[6].cluster == result[7].cluster - True + >>> assignments.sort(assignments.id).show(truncate=False) + +---+-------+ + |id |cluster| + +---+-------+ + |0 |1 | + |1 |1 | + |2 |1 | + |3 |1 | + |4 |1 | + |5 |0 | + +---+-------+ + ... >>> pic_path = temp_path + "/pic" >>> pic.save(pic_path) >>> pic2 = PowerIterationClustering.load(pic_path) @@ -1212,11 +1212,18 @@ class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReada >>> pic2.getMaxIter() 40 >>> assignments2 = pic2.assignClusters(df) - >>> result2 = sorted(assignments2.collect(), key=lambda x: x.id) - >>> result2[0].cluster == result2[1].cluster == result2[2].cluster == result2[3].cluster - True - >>> result2[4].cluster == result2[5].cluster == result2[6].cluster == result2[7].cluster - True + >>> assignments2.sort(assignments2.id).show(truncate=False) + +---+-------+ + |id |cluster| + +---+-------+ + |0 |1 | + |1 |1 | + |2 |1 | + |3 |1 | + |4 |1 | + |5 |0 | + +---+-------+ + ... >>> pic3 = PowerIterationClustering(k=4, initMode="degree", srcCol="source", dstCol="dest") >>> pic3.getSrcCol() 'source' @@ -1284,7 +1291,7 @@ def setK(self, value): @since("2.4.0") def getK(self): """ - Gets the value of :py:attr:`k`. + Gets the value of :py:attr:`k` or its default value. """ return self.getOrDefault(self.k) @@ -1298,7 +1305,7 @@ def setInitMode(self, value): @since("2.4.0") def getInitMode(self): """ - Gets the value of `initMode` + Gets the value of :py:attr:`initMode` or its default value. """ return self.getOrDefault(self.initMode) @@ -1312,7 +1319,7 @@ def setSrcCol(self, value): @since("2.4.0") def getSrcCol(self): """ - Gets the value of :py:attr:`srcCol`. + Gets the value of :py:attr:`srcCol` or its default value. """ return self.getOrDefault(self.srcCol) @@ -1326,7 +1333,7 @@ def setDstCol(self, value): @since("2.4.0") def getDstCol(self): """ - Gets the value of :py:attr:`dstCol`. + Gets the value of :py:attr:`dstCol` or its default value. """ return self.getOrDefault(self.dstCol) @@ -1349,6 +1356,8 @@ def assignClusters(self, dataset): the id. The schema of it will be: - id: Long - cluster: Int + + .. versionadded:: 2.4.0 """ self._transfer_params_to_java() jdf = self._java_obj.assignClusters(dataset._jdf) From 7c077252570b2c2458bacff06194a3943611fc7c Mon Sep 17 00:00:00 2001 From: Huaxin Gao Date: Sat, 9 Jun 2018 23:37:12 -0700 Subject: [PATCH 5/5] address comments(2) --- python/pyspark/ml/clustering.py | 48 +++++---------------------------- 1 file changed, 7 insertions(+), 41 deletions(-) diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 412e503dbccf9..4aa1cf84b5824 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -1174,24 +1174,14 @@ class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReada .. seealso:: `Wikipedia on Spectral clustering \ `_ - >>> data = [((long)(1), (long)(0), 0.5), \ - ((long)(2), (long)(0), 0.5), \ - ((long)(2), (long)(1), 0.7), \ - ((long)(3), (long)(0), 0.5), \ - ((long)(3), (long)(1), 0.7), \ - ((long)(3), (long)(2), 0.9), \ - ((long)(4), (long)(0), 0.5), \ - ((long)(4), (long)(1), 0.7), \ - ((long)(4), (long)(2), 0.9), \ - ((long)(4), (long)(3), 1.1), \ - ((long)(5), (long)(0), 0.5), \ - ((long)(5), (long)(1), 0.7), \ - ((long)(5), (long)(2), 0.9), \ - ((long)(5), (long)(3), 1.1), \ - ((long)(5), (long)(4), 1.3)] + >>> data = [(1, 0, 0.5), \ + (2, 0, 0.5), (2, 1, 0.7), \ + (3, 0, 0.5), (3, 1, 0.7), (3, 2, 0.9), \ + (4, 0, 0.5), (4, 1, 0.7), (4, 2, 0.9), (4, 3, 1.1), \ + (5, 0, 0.5), (5, 1, 0.7), (5, 2, 0.9), (5, 3, 1.1), (5, 4, 1.3)] >>> df = spark.createDataFrame(data).toDF("src", "dst", "weight") - >>> pic = PowerIterationClustering() - >>> assignments = pic.setK(2).setMaxIter(40).setWeightCol("weight").assignClusters(df) + >>> pic = PowerIterationClustering(k=2, maxIter=40, weightCol="weight") + >>> assignments = pic.assignClusters(df) >>> assignments.sort(assignments.id).show(truncate=False) +---+-------+ |id |cluster| @@ -1211,30 +1201,6 @@ class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReada 2 >>> pic2.getMaxIter() 40 - >>> assignments2 = pic2.assignClusters(df) - >>> assignments2.sort(assignments2.id).show(truncate=False) - +---+-------+ - |id |cluster| - +---+-------+ - |0 |1 | - |1 |1 | - |2 |1 | - |3 |1 | - |4 |1 | - |5 |0 | - +---+-------+ - ... - >>> pic3 = PowerIterationClustering(k=4, initMode="degree", srcCol="source", dstCol="dest") - >>> pic3.getSrcCol() - 'source' - >>> pic3.getDstCol() - 'dest' - >>> pic3.getK() - 4 - >>> pic3.getMaxIter() - 20 - >>> pic3.getInitMode() - 'degree' .. versionadded:: 2.4.0 """