Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/_data/menu-ml.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
url: ml-clustering.html
- text: Collaborative filtering
url: ml-collaborative-filtering.html
- text: Frequent Pattern Mining
url: ml-frequent-pattern-mining.html
- text: Model selection and tuning
url: ml-tuning.html
- text: Advanced topics
Expand Down
68 changes: 68 additions & 0 deletions docs/ml-frequent-pattern-mining.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
---
layout: global
title: Frequent Pattern Mining
displayTitle: Frequent Pattern Mining
---

Mining frequent items, itemsets, subsequences, or other substructures is usually among the
first steps to analyze a large-scale dataset, which has been an active research topic in
data mining for years.
We refer users to Wikipedia's [association rule learning](http://en.wikipedia.org/wiki/Association_rule_learning)
for more information.

**Table of Contents**

* This will become a table of contents (this text will be scraped).
{:toc}

## FP-Growth

The FP-growth algorithm is described in the paper
[Han et al., Mining frequent patterns without candidate generation](http://dx.doi.org/10.1145/335191.335372),
where "FP" stands for frequent pattern.
Given a dataset of transactions, the first step of FP-growth is to calculate item frequencies and identify frequent items.
Different from [Apriori-like](http://en.wikipedia.org/wiki/Apriori_algorithm) algorithms designed for the same purpose,
the second step of FP-growth uses a suffix tree (FP-tree) structure to encode transactions without generating candidate sets
explicitly, which are usually expensive to generate.
After the second step, the frequent itemsets can be extracted from the FP-tree.
In `spark.mllib`, we implemented a parallel version of FP-growth called PFP,
as described in [Li et al., PFP: Parallel FP-growth for query recommendation](http://dx.doi.org/10.1145/1454008.1454027).
PFP distributes the work of growing FP-trees based on the suffices of transactions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suffixes

and hence more scalable than a single-machine implementation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is more scalable

We refer users to the papers for more details.

`spark.ml`'s FP-growth implementation takes the following (hyper-)parameters:

* `minSupport`: the minimum support for an itemset to be identified as frequent.
For example, if an item appears 3 out of 5 transactions, it has a support of 3/5=0.6.
* `minConfidence`: minimum confidence for generating Association Rule. The parameter has no effect during `fit`, but specify
the minimum confidence for generating association rules from frequent itemsets.
* `numPartitions`: the number of partitions used to distribute the work.

The `FPGrowthModel` provides:

* `freqItemsets`: frequent itemsets in the format of DataFrame("items"[Seq], "freq"[Long])
* `associationRules`: association rules generated with confidence above `minConfidence`, in the format of
DataFrame("antecedent"[Seq], "consequent"[Seq], "confidence"[Double]).
* `transform`: The transform method examines the input items against all the association rules and
summarize the consequents as prediction. The prediction column has the same data type as the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this really explains what transform does or maybe it's just me?

I would have said something like:

The transform method will produce predictionCol containing all the consequents of the association rules containing the items in itemsCol as their antecedents. The prediction column...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I do wish to have a better illustration here. But the two containing in your version make it not that straightforward, and actually it should be items in itemsCol contains the antecedents for association rules.

I extend it to a longer version,

For each record in itemsCol, the transform method will compare its items against the antecedents of each association rule. If the record contains all the antecedents of a specific association rule, the rule will be considered as applicable and its consequents will be added to the prediction result. The transform method will summarize the consequents from all the applicable rules as prediction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even better 👍

input column and does not contain existing items in the input column.


**Examples**

<div class="codetabs">

<div data-lang="scala" markdown="1">
Refer to the [Scala API docs](api/scala/index.html#org.apache.spark.ml.fpm.FPGrowth) for more details.

{% include_example scala/org/apache/spark/examples/ml/FPGrowthExample.scala %}
</div>

<div data-lang="java" markdown="1">
Refer to the [Java API docs](api/java/org/apache/spark/ml/fpm/FPGrowth.html) for more details.

{% include_example java/org/apache/spark/examples/ml/JavaFPGrowthExample.java %}
</div>

</div>
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.ml;

// $example on$
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.fpm.FPGrowth;
import org.apache.spark.ml.fpm.FPGrowthModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// $example off$

public class JavaFPGrowthExample {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaFPGrowthExample")
.getOrCreate();

// $example on$
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("1 2 5".split(" "))),
RowFactory.create(Arrays.asList("1 2 3 5".split(" "))),
RowFactory.create(Arrays.asList("1 2".split(" ")))
);
StructType schema = new StructType(new StructField[]{ new StructField(
"features", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> itemsDF = spark.createDataFrame(data, schema);

// Learn a mapping from words to Vectors.
FPGrowth fpgrowth = new FPGrowth()
.setMinSupport(0.5)
.setMinConfidence(0.6);

FPGrowthModel model = fpgrowth.fit(itemsDF);

// get frequent itemsets.
model.freqItemsets().show();

// get generated association rules.
model.associationRules().show();

// transform examines the input items against all the association rules and summarize the
// consequents as prediction
Dataset<Row> result = model.transform(itemsDF);

result.show();
// $example off$

spark.stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.ml

// scalastyle:off println

// $example on$
import org.apache.spark.ml.fpm.FPGrowth
// $example off$
import org.apache.spark.sql.SparkSession

/**
* An example demonstrating FP-Growth.
* Run with
* {{{
* bin/run-example ml.FPGrowthExample
* }}}
*/
object FPGrowthExample {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove blank line

def main(args: Array[String]): Unit = {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove blank line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll remove this line.

val spark = SparkSession
.builder
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
import spark.implicits._

// $example on$
// Loads data.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment is pretty obvious, you can probably remove it

val dataset = spark.createDataset(Seq(
"1 2 5",
"1 2 3 5",
"1 2")
).map(t => t.split(" ")).toDF("features")
Copy link
Member

@BryanCutler BryanCutler Mar 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better to explicitly declare the data instead of manipulating strings, that way it is very clear what the input data is for the example. On second thought, never mind this comment - it's pretty clear the way it is


// Trains a FPGrowth model.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: technically it is just the line below calling fit that trains the model, I would move this comment down or just take it out

val fpgrowth = new FPGrowth().setMinSupport(0.5).setMinConfidence(0.6)
val model = fpgrowth.fit(dataset)

// get frequent itemsets.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should say "Display frequent itemsets."

model.freqItemsets.show()

// get generated association rules.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as comment above

model.associationRules.show()

// transform examines the input items against all the association rules and summarize the
// consequents as prediction
model.transform(dataset).show()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove blank line

// $example off$

spark.stop()
}
}
// scalastyle:on println
11 changes: 6 additions & 5 deletions mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ private[fpm] trait FPGrowthParams extends Params with HasFeaturesCol with HasPre
def getMinSupport: Double = $(minSupport)

/**
* Number of partitions (>=1) used by parallel FP-growth. By default the param is not set, and
* partition number of the input dataset is used.
* Number of partitions (positive) used by parallel FP-growth. By default the param is not set,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change to "positive", I think it was clearer before

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume to fix a javadoc error because angle brackets are read as opening an HTML tag

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. That's the reason. But still I'm getting some java doc error after merging code. Looking into it..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the error is related to this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just use ">=1" but figure out how to escape the characters for javadoc. We'll want to do that long-term.

* and partition number of the input dataset is used.
* @group expertParam
*/
@Since("2.2.0")
Expand Down Expand Up @@ -240,12 +240,13 @@ class FPGrowthModel private[ml] (
val predictUDF = udf((items: Seq[_]) => {
if (items != null) {
val itemset = items.toSet
brRules.value.flatMap(rule =>
if (items != null && rule._1.forall(item => itemset.contains(item))) {
brRules.value.flatMap { rule =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, while we're here -- why change this bit?
Or if simplifying, what about

brRules.value.filter(_._1_forall(itemset.contains)).flatMap(_._2.filter(!itemset.contains(_)))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change was about a style comment from the orginal PR that I missed. But it's great to see your suggestion. I'll run some test to confirm the performance.

if (rule._1.forall(item => itemset.contains(item))) {
rule._2.filter(item => !itemset.contains(item))
} else {
Seq.empty
})
}
}
} else {
Seq.empty
}.distinct }, dt)
Expand Down