-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-19826][ML][PYTHON]add spark.ml Python API for PIC #21119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
53d7763
2d0e394
387d6ff
6d00f34
a6b1822
c25d3dc
ae9f953
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,14 +19,14 @@ | |
|
|
||
| from pyspark import since, keyword_only | ||
| from pyspark.ml.util import * | ||
| from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper | ||
| from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaParams, JavaTransformer, JavaWrapper | ||
| from pyspark.ml.param.shared import * | ||
| from pyspark.ml.common import inherit_doc | ||
|
|
||
| __all__ = ['BisectingKMeans', 'BisectingKMeansModel', 'BisectingKMeansSummary', | ||
| 'KMeans', 'KMeansModel', | ||
| 'GaussianMixture', 'GaussianMixtureModel', 'GaussianMixtureSummary', | ||
| 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel'] | ||
| 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel', 'PowerIterationClustering'] | ||
|
|
||
|
|
||
| class ClusteringSummary(JavaWrapper): | ||
|
|
@@ -836,7 +836,7 @@ class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInter | |
|
|
||
| Terminology: | ||
|
|
||
| - "term" = "word": an el | ||
| - "term" = "word": an element of the vocabulary | ||
| - "token": instance of a term appearing in a document | ||
| - "topic": multinomial distribution over terms representing some concept | ||
| - "document": one piece of text, corresponding to one row in the input data | ||
|
|
@@ -938,7 +938,7 @@ def __init__(self, featuresCol="features", maxIter=20, seed=None, checkpointInte | |
| k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\ | ||
| subsamplingRate=0.05, optimizeDocConcentration=True,\ | ||
| docConcentration=None, topicConcentration=None,\ | ||
| topicDistributionCol="topicDistribution", keepLastCheckpoint=True): | ||
| topicDistributionCol="topicDistribution", keepLastCheckpoint=True) | ||
| """ | ||
| super(LDA, self).__init__() | ||
| self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid) | ||
|
|
@@ -967,7 +967,7 @@ def setParams(self, featuresCol="features", maxIter=20, seed=None, checkpointInt | |
| k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51,\ | ||
| subsamplingRate=0.05, optimizeDocConcentration=True,\ | ||
| docConcentration=None, topicConcentration=None,\ | ||
| topicDistributionCol="topicDistribution", keepLastCheckpoint=True): | ||
| topicDistributionCol="topicDistribution", keepLastCheckpoint=True) | ||
|
|
||
| Sets params for LDA. | ||
| """ | ||
|
|
@@ -1156,6 +1156,204 @@ def getKeepLastCheckpoint(self): | |
| return self.getOrDefault(self.keepLastCheckpoint) | ||
|
|
||
|
|
||
| @inherit_doc | ||
| class PowerIterationClustering(HasMaxIter, HasPredictionCol, JavaTransformer, JavaParams, | ||
| JavaMLReadable, JavaMLWritable): | ||
| """ | ||
| .. note:: Experimental | ||
| Power Iteration Clustering (PIC), a scalable graph clustering algorithm developed by | ||
| <a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From the abstract: | ||
| PIC finds a very low-dimensional embedding of a dataset using truncated power | ||
| iteration on a normalized pair-wise similarity matrix of the data. | ||
|
|
||
| PIC takes an affinity matrix between items (or vertices) as input. An affinity matrix | ||
| is a symmetric matrix whose entries are non-negative similarities between items. | ||
| PIC takes this matrix (or graph) as an adjacency matrix. Specifically, each input row | ||
| includes: | ||
|
|
||
| - :py:class:`idCol`: vertex ID | ||
|
||
| - :py:class:`neighborsCol`: neighbors of vertex in :py:class:`idCol` | ||
| - :py:class:`similaritiesCol`: non-negative weights (similarities) of edges between the | ||
| vertex in :py:class:`idCol` and each neighbor in :py:class:`neighborsCol` | ||
|
|
||
| PIC returns a cluster assignment for each input vertex. It appends a new column | ||
| :py:class:`predictionCol` containing the cluster assignment in :py:class:`[0,k)` for | ||
| each row (vertex). | ||
|
|
||
| Notes: | ||
|
||
|
|
||
| - [[PowerIterationClustering]] is a transformer with an expensive [[transform]] operation. | ||
| Transform runs the iterative PIC algorithm to cluster the whole input dataset. | ||
| - Input validation: This validates that similarities are non-negative but does NOT validate | ||
| that the input matrix is symmetric. | ||
|
|
||
| @see <a href=http://en.wikipedia.org/wiki/Spectral_clustering> | ||
|
||
| Spectral clustering (Wikipedia)</a> | ||
|
||
|
|
||
| >>> from pyspark.sql.types import ArrayType, DoubleType, LongType, StructField, StructType | ||
| >>> similarities = [((long)(1), [0], [0.5]), ((long)(2), [0, 1], [0.7,0.5]), \ | ||
| ((long)(3), [0, 1, 2], [0.9, 0.7, 0.5]), \ | ||
| ((long)(4), [0, 1, 2, 3], [1.1, 0.9, 0.7,0.5]), \ | ||
| ((long)(5), [0, 1, 2, 3, 4], [1.3, 1.1, 0.9, 0.7,0.5])] | ||
| >>> rdd = sc.parallelize(similarities, 2) | ||
| >>> schema = StructType([StructField("id", LongType(), False), \ | ||
| StructField("neighbors", ArrayType(LongType(), False), True), \ | ||
| StructField("similarities", ArrayType(DoubleType(), False), True)]) | ||
| >>> df = spark.createDataFrame(rdd, schema) | ||
| >>> pic = PowerIterationClustering() | ||
| >>> result = pic.setK(2).setMaxIter(10).transform(df) | ||
| >>> predictions = sorted(set([(i[0], i[1]) for i in result.select(result.id, result.prediction) | ||
| ... .collect()]), key=lambda x: x[0]) | ||
| >>> predictions[0] | ||
| (1, 1) | ||
| >>> predictions[1] | ||
| (2, 1) | ||
| >>> predictions[2] | ||
| (3, 0) | ||
| >>> predictions[3] | ||
| (4, 0) | ||
| >>> predictions[4] | ||
| (5, 0) | ||
| >>> pic_path = temp_path + "/pic" | ||
| >>> pic.save(pic_path) | ||
| >>> pic2 = PowerIterationClustering.load(pic_path) | ||
| >>> pic2.getK() | ||
| 2 | ||
| >>> pic2.getMaxIter() | ||
| 10 | ||
| >>> pic3 = PowerIterationClustering(k=4, initMode="degree") | ||
| >>> pic3.getIdCol() | ||
| 'id' | ||
| >>> pic3.getK() | ||
| 4 | ||
| >>> pic3.getMaxIter() | ||
| 20 | ||
| >>> pic3.getInitMode() | ||
| 'degree' | ||
|
|
||
| .. versionadded:: 2.4.0 | ||
| """ | ||
|
|
||
| k = Param(Params._dummy(), "k", | ||
| "The number of clusters to create. Must be > 1.", | ||
| typeConverter=TypeConverters.toInt) | ||
| initMode = Param(Params._dummy(), "initMode", | ||
| "The initialization algorithm. This can be either " + | ||
| "'random' to use a random vector as vertex properties, or 'degree' to use " + | ||
| "a normalized sum of similarities with other vertices. Supported options: " + | ||
| "'random' and 'degree'.", | ||
| typeConverter=TypeConverters.toString) | ||
| idCol = Param(Params._dummy(), "idCol", | ||
| "Name of the input column for vertex IDs.", | ||
| typeConverter=TypeConverters.toString) | ||
| neighborsCol = Param(Params._dummy(), "neighborsCol", | ||
| "Name of the input column for neighbors in the adjacency list " + | ||
| "representation.", | ||
| typeConverter=TypeConverters.toString) | ||
| similaritiesCol = Param(Params._dummy(), "similaritiesCol", | ||
| "Name of the input column for non-negative weights (similarities) " + | ||
| "of edges between the vertex in `idCol` and each neighbor in " + | ||
| "`neighborsCol`", | ||
| typeConverter=TypeConverters.toString) | ||
|
|
||
| @keyword_only | ||
| def __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random", | ||
| idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): | ||
| """ | ||
| __init__(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\ | ||
| idCol="id", neighborsCol="neighbors", similaritiesCol="similarities") | ||
| """ | ||
| super(PowerIterationClustering, self).__init__() | ||
| self._java_obj = self._new_java_obj( | ||
| "org.apache.spark.ml.clustering.PowerIterationClustering", self.uid) | ||
| self._setDefault(k=2, maxIter=20, initMode="random", idCol="id", neighborsCol="neighbors", | ||
| similaritiesCol="similarities") | ||
| kwargs = self._input_kwargs | ||
| self.setParams(**kwargs) | ||
|
|
||
| @keyword_only | ||
| @since("2.4.0") | ||
| def setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random", | ||
| idCol="id", neighborsCol="neighbors", similaritiesCol="similarities"): | ||
| """ | ||
| setParams(self, predictionCol="prediction", k=2, maxIter=20, initMode="random",\ | ||
| idCol="id", neighborsCol="neighbors", similaritiesCol="similarities") | ||
| Sets params for PowerIterationClustering. | ||
| """ | ||
| kwargs = self._input_kwargs | ||
| return self._set(**kwargs) | ||
|
|
||
| @since("2.4.0") | ||
| def setK(self, value): | ||
| """ | ||
| Sets the value of :py:attr:`k`. | ||
| """ | ||
| return self._set(k=value) | ||
|
|
||
| @since("2.4.0") | ||
| def getK(self): | ||
| """ | ||
| Gets the value of :py:attr:`k`. | ||
| """ | ||
| return self.getOrDefault(self.k) | ||
|
|
||
| @since("2.4.0") | ||
| def setInitMode(self, value): | ||
| """ | ||
| Sets the value of :py:attr:`initMode`. | ||
| """ | ||
| return self._set(initMode=value) | ||
|
|
||
| @since("2.4.0") | ||
| def getInitMode(self): | ||
| """ | ||
| Gets the value of `initMode` | ||
| """ | ||
| return self.getOrDefault(self.initMode) | ||
|
|
||
| @since("2.4.0") | ||
| def setIdCol(self, value): | ||
| """ | ||
| Sets the value of :py:attr:`idCol`. | ||
| """ | ||
| return self._set(idCol=value) | ||
|
|
||
| @since("2.4.0") | ||
| def getIdCol(self): | ||
| """ | ||
| Gets the value of :py:attr:`idCol`. | ||
| """ | ||
| return self.getOrDefault(self.idCol) | ||
|
|
||
| @since("2.4.0") | ||
| def setNeighborsCol(self, value): | ||
| """ | ||
| Sets the value of :py:attr:`neighborsCol. | ||
|
||
| """ | ||
| return self._set(neighborsCol=value) | ||
|
|
||
| @since("2.4.0") | ||
| def getNeighborsCol(self): | ||
| """ | ||
| Gets the value of :py:attr:`neighborsCol`. | ||
| """ | ||
| return self.getOrDefault(self.neighborsCol) | ||
|
|
||
| @since("2.4.0") | ||
| def setSimilaritiesCol(self, value): | ||
| """ | ||
| Sets the value of :py:attr:`similaritiesCol`. | ||
| """ | ||
| return self._set(similaritiesCol=value) | ||
|
|
||
| @since("2.4.0") | ||
| def getSimilaritiesCol(self): | ||
| """ | ||
| Gets the value of :py:attr:`similaritiesCol`. | ||
| """ | ||
| return self.getOrDefault(self.binary) | ||
|
||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| import doctest | ||
| import pyspark.ml.clustering | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!