Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 218 additions & 5 deletions python/pyspark/ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@

from pyspark import since, keyword_only
from pyspark.ml.util import *
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper
from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaWrapper
from pyspark.ml.param.shared import *
from pyspark.ml.common import inherit_doc
from pyspark.sql.functions import col, lit
from pyspark.sql.types import DoubleType, LongType
from pyspark.mllib.clustering import PowerIterationClustering as MLlibPowerIterationClustering

__all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary',
'KMeans', 'KMeansModel',
'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary',
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel']
'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering']


class ClusteringSummary(JavaWrapper):
Expand Down Expand Up @@ -836,7 +839,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter

Terminology:

- "term" = "word": an el
- "term" = "word": an element of the vocabulary
- "token": instance of a term appearing in a document
- "topic": multinomial distribution over terms representing some concept
- "document": one piece of text, corresponding to one row in the input data
Expand Down Expand Up @@ -938,7 +941,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)
"""
super(LDA, self).__init__()
self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid)
Expand Down Expand Up @@ -967,7 +970,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt
k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\
subsamplingRate=0.05, optimizeDocConcentration=True,\
docConcentration=None, topicConcentration=None,\
topicDistributionCol="topicDistribution", keepLastCheckpoint=True):
topicDistributionCol="topicDistribution", keepLastCheckpoint=True)

Sets params for LDA.
"""
Expand Down Expand Up @@ -1156,6 +1159,216 @@ def getKeepLastCheckpoint(self):
return self.getOrDefault(self.keepLastCheckpoint)


@inherit_doc
class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, JavaMLReadable,
JavaMLWritable):
"""
.. note:: Experimental

Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by
<a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the abstract:
PIC finds a very low-dimensional embedding of a dataset using truncated power
iteration on a normalized pair-wise similarity matrix of the data.

This class is not yet an Estimator/Transformer, use `assignClusters` method to run the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... use :py:func:`assignClusters` method ...

PowerIterationClustering algorithm.

.. seealso:: `Wikipedia on Spectral clustering \
<http://en.wikipedia.org/wiki/Spectral_clustering>`_

>>> from pyspark.sql.types import StructField, StructType
>>> import math
>>> def genCircle(r, n):
... points = []
... for i in range(0, n):
... theta = 2.0 * math.pi * i / n
... points.append((r * math.cos(theta), r * math.sin(theta)))
... return points
>>> def sim(x, y):
... dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - y[1])
... return math.exp(-dist / 2.0)
>>> r1 = 1.0
>>> n1 = 10
>>> r2 = 4.0
>>> n2 = 40
>>> n = n1 + n2
>>> points = genCircle(r1, n1) + genCircle(r2, n2)
>>> data = [(i, j, sim(points[i], points[j])) for i in range(1, n) for j in range(0, i)]
>>> rdd = sc.parallelize(data, 2)
>>> schema = StructType([StructField("src", LongType(), False), \
StructField("dst", LongType(), True), \
StructField("weight", DoubleType(), True)])
>>> df = spark.createDataFrame(rdd, schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test code here is too complex to be in doctest. could you change it to code like:
df = sc.parallelize(...).toDF(...)
generate a small, hardcoded dataset.

>>> pic = PowerIterationClustering()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only keep one example, we should use keyword args:

pic = PowerIterationClustering(k=2, maxIter=40, weightCol="weight")
assignments = pic.assignClusters(df)

>>> assignments = pic.setK(2).setMaxIter(40).setWeightCol("weight").assignClusters(df)
>>> result = sorted(assignments.collect(), key=lambda x: x.id)
>>> result[0].cluster == result[1].cluster == result[2].cluster == result[3].cluster
True
>>> result[4].cluster == result[5].cluster == result[6].cluster == result[7].cluster
True
>>> pic_path = temp_path + "/pic"
>>> pic.save(pic_path)
>>> pic2 = PowerIterationClustering.load(pic_path)
>>> pic2.getK()
2
>>> pic2.getMaxIter()
40
>>> assignments2 = pic2.assignClusters(df)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and pic3 seem unnecessary to me as doctest. Same for pic3. Doctest is mainly to provide examples, not a full suite of unit tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove line 1214 and the following.

>>> result2 = sorted(assignments2.collect(), key=lambda x: x.id)
>>> result2[0].cluster == result2[1].cluster == result2[2].cluster == result2[3].cluster
True
>>> result2[4].cluster == result2[5].cluster == result2[6].cluster == result2[7].cluster
True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a simpler way to check result, like:

>>> assignments.sort(assignments.id).show(truncate=False)
...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. Will make changes.

>>> pic3 = PowerIterationClustering(k=4, initMode="degree", srcCol="source", dstCol="dest")
>>> pic3.getSrcCol()
'source'
>>> pic3.getDstCol()
'dest'
>>> pic3.getK()
4
>>> pic3.getMaxIter()
20
>>> pic3.getInitMode()
'degree'

.. versionadded:: 2.4.0
"""

k = Param(Params._dummy(), "k",
"The number of clusters to create. Must be > 1.",
typeConverter=TypeConverters.toInt)
initMode = Param(Params._dummy(), "initMode",
"The initialization algorithm. This can be either " +
"'random' to use a random vector as vertex properties, or 'degree' to use " +
"a normalized sum of similarities with other vertices. Supported options: " +
"'random' and 'degree'.",
typeConverter=TypeConverters.toString)
srcCol = Param(Params._dummy(), "srcCol",
"Name of the input column for source vertex IDs.",
typeConverter=TypeConverters.toString)
dstCol = Param(Params._dummy(), "dstCol",
"Name of the input column for destination vertex IDs.",
typeConverter=TypeConverters.toString)

@keyword_only
def __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",
weightCol=None):
"""
__init__(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\
weightCol=None)
"""
super(PowerIterationClustering, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.clustering.PowerIterationClustering", self.uid)
self._setDefault(k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst")
kwargs = self._input_kwargs
self.setParams(**kwargs)

@keyword_only
@since("2.4.0")
def setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",
weightCol=None):
"""
setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", dstCol="dst",\
weightCol=None)
Sets params for PowerIterationClustering.
"""
kwargs = self._input_kwargs
return self._set(**kwargs)

@since("2.4.0")
def setK(self, value):
"""
Sets the value of :py:attr:`k`.
"""
return self._set(k=value)

@since("2.4.0")
def getK(self):
"""
Gets the value of :py:attr:`k`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change to

Gets the value of :py:attr:`k` or its default value.

here and every other places.

"""
return self.getOrDefault(self.k)

@since("2.4.0")
def setInitMode(self, value):
"""
Sets the value of :py:attr:`initMode`.
"""
return self._set(initMode=value)

@since("2.4.0")
def getInitMode(self):
"""
Gets the value of `initMode`
"""
return self.getOrDefault(self.initMode)

@since("2.4.0")
def setSrcCol(self, value):
"""
Sets the value of :py:attr:`srcCol`.
"""
return self._set(srcCol=value)

@since("2.4.0")
def getSrcCol(self):
"""
Gets the value of :py:attr:`srcCol`.
"""
return self.getOrDefault(self.srcCol)

@since("2.4.0")
def setDstCol(self, value):
"""
Sets the value of :py:attr:`dstCol`.
"""
return self._set(dstCol=value)

@since("2.4.0")
def getDstCol(self):
"""
Gets the value of :py:attr:`dstCol`.
"""
return self.getOrDefault(self.dstCol)

@since("2.4.0")
def assignClusters(self, dataset):
"""
Run the PIC algorithm and returns a cluster assignment for each input vertex.

:param dataset:
A dataset with columns src, dst, weight representing the affinity matrix,
which is the matrix A in the PIC paper. Suppose the src column value is i,
the dst column value is j, the weight column value is similarity s,,ij,,
which must be nonnegative. This is a symmetric matrix and hence
s,,ij,, = s,,ji,,. For any (i, j) with nonzero similarity, there should be
either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows with i = j are
ignored, because we assume s,,ij,, = 0.0.

:return: A dataset that contains columns of vertex id and the corresponding cluster for
the id. The schema of it will be:
- id: Long
- cluster: Int
"""
weightCol = None
w = None
if (self.isDefined(self.weightCol)):
weightCol = self.getWeightCol()
if (weightCol is None or len(weightCol) == 0):
w = lit(1.0)
else:
w = col(weightCol).cast(DoubleType())
srcCol = self.getSrcCol()
dstCol = self.getDstCol()
df = dataset.select(col(srcCol).cast(LongType()), col(dstCol).cast(LongType()), w)
data = df.rdd.map(lambda x: (x[0], x[1], x[2]))
algorithm = MLlibPowerIterationClustering()
Copy link
Contributor

@WeichenXu123 WeichenXu123 Jun 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here should wrap ml.clustering.PowerIterationClustering, not the one in pyspark.mllib. So that it can make sure behavior consistency with scala api.
you can reference this PR: #21265

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WeichenXu123 Thank you very much for your comment. Will change now.

model = MLlibPowerIterationClustering.train(data, self.getK(), self.getMaxIter(),
self.getInitMode())
return model.assignments().toDF()


if __name__ == "__main__":
import doctest
import pyspark.ml.clustering
Expand Down