From d939a926203fa443305078cb3caf573111b75359 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Tue, 19 Aug 2014 21:02:25 -0700 Subject: [PATCH 1/6] Updated DecisionTree documentation. Added Java, Python examples. --- docs/mllib-decision-tree.md | 262 ++++++++++++++++++++++++++++-------- 1 file changed, 207 insertions(+), 55 deletions(-) diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index c01a92a9a1b2..b6f87cb3ddb8 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -7,20 +7,26 @@ displayTitle: MLlib - Decision Tree * Table of contents {:toc} -Decision trees and their ensembles are popular methods for the machine learning tasks of +[Decision trees](http://en.wikipedia.org/wiki/Decision_tree_learning) +and their ensembles are popular methods for the machine learning tasks of classification and regression. Decision trees are widely used since they are easy to interpret, -handle categorical variables, extend to the multiclass classification setting, do not require +handle categorical features, extend to the multiclass classification setting, do not require feature scaling and are able to capture nonlinearities and feature interactions. Tree ensemble -algorithms such as decision forest and boosting are among the top performers for classification and +algorithms such as decision forests and boosting are among the top performers for classification and regression tasks. +MLlib supports decision trees for binary and multiclass classification and for regression, +using both continuous and categorical features. The implementation partitions data by rows, +allowing distributed training with millions of instances. + ## Basic algorithm The decision tree is a greedy algorithm that performs a recursive binary partitioning of the feature -space by choosing a single element from the *best split set* where each element of the set maximizes -the information gain at a tree node. In other words, the split chosen at each tree node is chosen -from the set `$\underset{s}{\operatorname{argmax}} IG(D,s)$` where `$IG(D,s)$` is the information -gain when a split `$s$` is applied to a dataset `$D$`. +space. The tree predicts the same label for each bottommost (leaf) partition. +Each partition is chosen greedily by selecting the *best split* from a set of possible splits, +in order to maximize the information gain at a tree node. In other words, the split chosen at each +tree node is chosen from the set `$\underset{s}{\operatorname{argmax}} IG(D,s)$` where `$IG(D,s)$` +is the information gain when a split `$s$` is applied to a dataset `$D$`. ### Node impurity and information gain @@ -52,9 +58,10 @@ impurity measure for regression (variance). -The *information gain* is the difference in the parent node impurity and the weighted sum of the two -child node impurities. Assuming that a split $s$ partitions the dataset `$D$` of size `$N$` into two -datasets `$D_{left}$` and `$D_{right}$` of sizes `$N_{left}$` and `$N_{right}$`, respectively: +The *information gain* is the difference between the parent node impurity and the weighted sum of +the two child node impurities. Assuming that a split $s$ partitions the dataset `$D$` of size `$N$` +into two datasets `$D_{left}$` and `$D_{right}$` of sizes `$N_{left}$` and `$N_{right}$`, +respectively, the information gain is: `$IG(D,s) = Impurity(D) - \frac{N_{left}}{N} Impurity(D_{left}) - \frac{N_{right}}{N} Impurity(D_{right})$` @@ -62,14 +69,15 @@ datasets `$D_{left}$` and `$D_{right}$` of sizes `$N_{left}$` and `$N_{right}$`, **Continuous features** -For small datasets in single machine implementations, the split candidates for each continuous +For small datasets in single-machine implementations, the split candidates for each continuous feature are typically the unique values for the feature. Some implementations sort the feature values and then use the ordered unique values as split candidates for faster tree calculations. -Finding ordered unique feature values is computationally intensive for large distributed -datasets. One can get an approximate set of split candidates by performing a quantile calculation -over a sampled fraction of the data. The ordered splits create "bins" and the maximum number of such -bins can be specified using the `maxBins` parameters. +Sorting feature values is expensive for large distributed datasets. +This implementation computes an approximate set of split candidates by performing a quantile +calculation over a sampled fraction of the data. +The ordered splits create "bins" and the maximum number of such +bins can be specified using the `maxBins` parameter. Note that the number of bins cannot be greater than the number of instances `$N$` (a rare scenario since the default `maxBins` value is 100). The tree algorithm automatically reduces the number of @@ -77,33 +85,46 @@ bins if the condition is not satisfied. **Categorical features** -For `$M$` categorical feature values, one could come up with `$2^(M-1)-1$` split candidates. For -binary classification, we can reduce the number of split candidates to `$M-1$` by ordering the +For a categorical feature with `$M$` possible values (categories), one could come up with +`$2^{M-1}-1$` split candidates. For binary classification and regression, +we can reduce the number of split candidates to `$M-1$` by ordering the categorical feature values by the proportion of labels falling in one of the two classes (see Section 9.2.4 in [Elements of Statistical Machine Learning](http://statweb.stanford.edu/~tibs/ElemStatLearn/) for details). For example, for a binary classification problem with one categorical feature with three -categories A, B and C with corresponding proportion of label 1 as 0.2, 0.6 and 0.4, the categorical -features are ordered as A followed by C followed B or A, C, B. The two split candidates are A \| C, B +categories A, B and C whose corresponding proportions of label 1 are 0.2, 0.6 and 0.4, the categorical +features are ordered as A, C, B. The two split candidates are A \| C, B and A , C \| B where \| denotes the split. A similar heuristic is used for multiclass classification -when `$2^(M-1)-1$` is greater than the number of bins -- the impurity for each categorical feature value -is used for ordering. +when `$2^{M-1}-1$` is greater than the `maxBins` parameter: the impurity for each categorical feature value +is used for ordering. In multiclass classification, all `$2^{M-1}-1$` possible splits are used +whenever possible. + +Note that the `maxBins` parameter must be at least `$M_{max}$`, the maximum number of categories for +any categorical feature. ### Stopping rule The recursive tree construction is stopped at a node when one of the two conditions is met: -1. The node depth is equal to the `maxDepth` training parameter +1. The node depth is equal to the `maxDepth` training parameter. 2. No split candidate leads to an information gain at the node. ### Max memory requirements -For faster processing, the decision tree algorithm performs simultaneous histogram computations for all nodes at each level of the tree. This could lead to high memory requirements at deeper levels of the tree leading to memory overflow errors. To alleviate this problem, a 'maxMemoryInMB' training parameter is provided which specifies the maximum amount of memory at the workers (twice as much at the master) to be allocated to the histogram computation. The default value is conservatively chosen to be 128 MB to allow the decision algorithm to work in most scenarios. Once the memory requirements for a level-wise computation crosses the `maxMemoryInMB` threshold, the node training tasks at each subsequent level is split into smaller tasks. +For faster processing, the decision tree algorithm performs simultaneous histogram computations for +all nodes at each level of the tree. This could lead to high memory requirements at deeper levels +of the tree, leading to memory overflow errors. To alleviate this problem, a `maxMemoryInMB` +training parameter specifies the maximum amount of memory at the workers (twice as much at the +master) to be allocated to the histogram computation. The default value is conservatively chosen to +be 128 MB to allow the decision algorithm to work in most scenarios. Once the memory requirements +for a level-wise computation cross the `maxMemoryInMB` threshold, the node training tasks at each +subsequent level are split into smaller tasks. ### Practical limitations 1. The implemented algorithm reads both sparse and dense data. However, it is not optimized for sparse input. -2. Python is not supported in this release. +2. Computation scales approximately linearly in the number of training instances, + in the number of features, and in the `maxBins` parameter. ## Examples @@ -114,35 +135,101 @@ perform classification using a decision tree using Gini impurity as an impurity maximum tree depth of 5. The training error is calculated to measure the algorithm accuracy.
+
{% highlight scala %} -import org.apache.spark.SparkContext import org.apache.spark.mllib.tree.DecisionTree -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.tree.configuration.Algo._ -import org.apache.spark.mllib.tree.impurity.Gini +import org.apache.spark.mllib.util.MLUtils // Load and parse the data file -val data = sc.textFile("data/mllib/sample_tree_data.csv") -val parsedData = data.map { line => - val parts = line.split(',').map(_.toDouble) - LabeledPoint(parts(0), Vectors.dense(parts.tail)) -} +val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") -// Run training algorithm to build the model +// Train a DecisionTree model. +// Empty categoricalFeaturesInfo indicates all features are continuous. +val numClasses = 2 +val categoricalFeaturesInfo = Map[Int, Int]() +val impurity = "gini" val maxDepth = 5 -val model = DecisionTree.train(parsedData, Classification, Gini, maxDepth) +val maxBins = 100 + +val model = DecisionTree.trainClassifier(data, numClasses, categoricalFeaturesInfo, impurity, + maxDepth, maxBins) -// Evaluate model on training examples and compute training error -val labelAndPreds = parsedData.map { point => +// Evaluate model on training instances and compute training error +val labelAndPreds = data.map { point => val prediction = model.predict(point.features) (point.label, prediction) } -val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / parsedData.count +val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / data.count println("Training Error = " + trainErr) {% endhighlight %}
+ +
+{% highlight java %} +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.tree.DecisionTree; +import org.apache.spark.mllib.tree.model.DecisionTreeModel; +import scala.Tuple2; + +JavaRDD data = ... // data set + +// Train a DecisionTree model. +// Empty categoricalFeaturesInfo indicates all features are continuous. +Integer numClasses = ... // number of classes +HashMap categoricalFeaturesInfo = new HashMap(); +String impurity = "gini"; +Integer maxDepth = 5; +Integer maxBins = 100; + +final DecisionTreeModel model = DecisionTree.trainClassifier(data.rdd(), numClasses, + categoricalFeaturesInfo, impurity, maxDepth, maxBins); + +// Evaluate model on training instances and compute training error +JavaPairRDD predictionAndLabel = + data.mapToPair(new PairFunction() { + @Override public Tuple2 call(LabeledPoint p) { + return new Tuple2(model.predict(p.features()), p.label()); + } + }); +Double trainErr = 1.0 * predictionAndLabel.filter(new Function, Boolean>() { + @Override public Boolean call(Tuple2 pl) { + return pl._1() != pl._2(); + } + }).count() / data.count(); +{% endhighlight %} +
+ +
+{% highlight python %} +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.tree import DecisionTree +from pyspark.mllib.util import MLUtils + +# an RDD of LabeledPoint +data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') + +# Train a DecisionTree model. +# Empty categoricalFeaturesInfo indicates all features are continuous. +model = DecisionTree.trainClassifier(data, numClasses=2, categoricalFeaturesInfo={}, + impurity='gini', maxDepth=5, maxBins=100) + +# Evaluate model on training instances and compute training error +predictions = model.predict(data.map(lambda x: x.features)) +labelsAndPredictions = data.map(lambda lp: lp.label).zip(predictions) +trainErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(data.count()) +print('Training Error = ' + str(trainErr)) +{% endhighlight %} + +Note: When making predictions for a dataset, it is more efficient to do batch prediction rather +than separately calling `predict` on each data point. This is because the Python code makes calls +to an underlying `DecisionTree` model in Scala. +
+
### Regression @@ -153,33 +240,98 @@ depth of 5. The Mean Squared Error (MSE) is computed at the end to evaluate [goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).
+
{% highlight scala %} -import org.apache.spark.SparkContext import org.apache.spark.mllib.tree.DecisionTree -import org.apache.spark.mllib.regression.LabeledPoint -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.tree.configuration.Algo._ -import org.apache.spark.mllib.tree.impurity.Variance +import org.apache.spark.mllib.util.MLUtils // Load and parse the data file -val data = sc.textFile("data/mllib/sample_tree_data.csv") -val parsedData = data.map { line => - val parts = line.split(',').map(_.toDouble) - LabeledPoint(parts(0), Vectors.dense(parts.tail)) -} +val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") -// Run training algorithm to build the model +// Train a DecisionTree model. +// Empty categoricalFeaturesInfo indicates all features are continuous. +val categoricalFeaturesInfo = Map[Int, Int]() +val impurity = "variance" val maxDepth = 5 -val model = DecisionTree.train(parsedData, Regression, Variance, maxDepth) +val maxBins = 100 -// Evaluate model on training examples and compute training error -val valuesAndPreds = parsedData.map { point => +val model = DecisionTree.trainRegressor(data, categoricalFeaturesInfo, impurity, + maxDepth, maxBins) + +// Evaluate model on training instances and compute training error +val labelsAndPredictions = data.map { point => val prediction = model.predict(point.features) (point.label, prediction) } -val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.mean() -println("training Mean Squared Error = " + MSE) +val trainMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean() +println("Training Mean Squared Error = " + trainMSE) +{% endhighlight %} +
+ +
+{% highlight java %} +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.tree.DecisionTree; +import org.apache.spark.mllib.tree.model.DecisionTreeModel; +import scala.Tuple2; + +JavaRDD data = ... // data set + +// Train a DecisionTree model. +// Empty categoricalFeaturesInfo indicates all features are continuous. +HashMap categoricalFeaturesInfo = new HashMap(); +String impurity = "variance"; +Integer maxDepth = 5; +Integer maxBins = 100; + +final DecisionTreeModel model = DecisionTree.trainRegressor(data.rdd(), + categoricalFeaturesInfo, impurity, maxDepth, maxBins); + +// Evaluate model on training instances and compute training error +JavaPairRDD predictionAndLabel = + data.mapToPair(new PairFunction() { + @Override public Tuple2 call(LabeledPoint p) { + return new Tuple2(model.predict(p.features()), p.label()); + } + }); +Double trainMSE = predictionAndLabel.map(new Function, Double>() { + @Override public Double call(Tuple2 pl) { + Double diff = pl._1() - pl._2(); + return diff * diff; + } + }).sum() / data.count(); +{% endhighlight %} +
+ +
+{% highlight python %} +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.tree import DecisionTree +from pyspark.mllib.util import MLUtils + +# an RDD of LabeledPoint +data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') + +# Train a DecisionTree model. +# Empty categoricalFeaturesInfo indicates all features are continuous. +model = DecisionTree.trainRegressor(data, categoricalFeaturesInfo={}, + impurity='variance', maxDepth=5, maxBins=100) + +# Evaluate model on training instances and compute training error +predictions = model.predict(data.map(lambda x: x.features)) +labelsAndPredictions = data.map(lambda lp: lp.label).zip(predictions) +trainMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() / float(data.count()) +print('Training Mean Squared Error = ' + str(trainMSE)) {% endhighlight %} + +Note: When making predictions for a dataset, it is more efficient to do batch prediction rather +than separately calling `predict` on each data point. This is because the Python code makes calls +to an underlying `DecisionTree` model in Scala.
+
From 57eee9fa174fa3435f69c38785d9c757f3744fd9 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 20 Aug 2014 10:34:57 -0700 Subject: [PATCH 2/6] Created JavaDecisionTree example from example in docs, and corrected doc example as needed. --- docs/mllib-decision-tree.md | 12 +- .../examples/mllib/JavaDecisionTree.java | 115 ++++++++++++++++++ 2 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index b6f87cb3ddb8..e55db0ab0dea 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -167,6 +167,7 @@ println("Training Error = " + trainErr)
{% highlight java %} +import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; @@ -174,7 +175,6 @@ import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.DecisionTree; import org.apache.spark.mllib.tree.model.DecisionTreeModel; -import scala.Tuple2; JavaRDD data = ... // data set @@ -186,7 +186,7 @@ String impurity = "gini"; Integer maxDepth = 5; Integer maxBins = 100; -final DecisionTreeModel model = DecisionTree.trainClassifier(data.rdd(), numClasses, +final DecisionTreeModel model = DecisionTree.trainClassifier(data, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins); // Evaluate model on training instances and compute training error @@ -198,9 +198,11 @@ JavaPairRDD predictionAndLabel = }); Double trainErr = 1.0 * predictionAndLabel.filter(new Function, Boolean>() { @Override public Boolean call(Tuple2 pl) { - return pl._1() != pl._2(); + return !pl._1().equals(pl._2()); } }).count() / data.count(); +System.out.print("Training error: " + trainErr); +System.out.print("Learned model:\n" + model); {% endhighlight %}
@@ -289,7 +291,7 @@ String impurity = "variance"; Integer maxDepth = 5; Integer maxBins = 100; -final DecisionTreeModel model = DecisionTree.trainRegressor(data.rdd(), +final DecisionTreeModel model = DecisionTree.trainRegressor(data, categoricalFeaturesInfo, impurity, maxDepth, maxBins); // Evaluate model on training instances and compute training error @@ -305,6 +307,8 @@ Double trainMSE = predictionAndLabel.map(new Function, Do return diff * diff; } }).sum() / data.count(); +System.out.print("Training Mean Squared Error: " + trainMSE); +System.out.print("Learned model:\n" + model); {% endhighlight %} diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java new file mode 100644 index 000000000000..a70d5f40b525 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import java.util.HashMap; + +import scala.reflect.ClassTag; +import scala.Tuple2; + +import org.apache.spark.api.java.function.Function2; + import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.tree.DecisionTree; +import org.apache.spark.mllib.tree.model.DecisionTreeModel; +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.SparkConf; + + +/** + * Classification and regression using decision trees. + */ +public final class JavaDecisionTree { + + public static void main(String[] args) { + if (args.length != 1) { + System.err.println("Usage: JavaDecisionTree "); + System.exit(1); + } + SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTree"); + JavaSparkContext sc = new JavaSparkContext(sparkConf); + String datapath = args[0]; + + JavaRDD data = JavaRDD.fromRDD(MLUtils.loadLibSVMFile(sc.sc(), datapath)); + + // Compute the number of classes from the data. + Integer numClasses = data.map(new Function() { + @Override public Double call(LabeledPoint p) { + return p.label(); + } + }).countByValue().size(); + // Empty categoricalFeaturesInfo indicates all features are continuous. + HashMap categoricalFeaturesInfo = new HashMap(); + String impurity = "gini"; + Integer maxDepth = 5; + Integer maxBins = 100; + + // Train a DecisionTree model for classification. + final DecisionTreeModel model = DecisionTree.trainClassifier(data, numClasses, + categoricalFeaturesInfo, impurity, maxDepth, maxBins); + + // Evaluate model on training instances and compute training error + JavaPairRDD predictionAndLabel = + data.mapToPair(new PairFunction() { + @Override public Tuple2 call(LabeledPoint p) { + return new Tuple2(model.predict(p.features()), p.label()); + } + }); + Double trainErr = + 1.0 * predictionAndLabel.filter(new Function, Boolean>() { + @Override public Boolean call(Tuple2 pl) { + return !pl._1().equals(pl._2()); + } + }).count() / data.count(); + System.out.print("Training error: " + trainErr); + System.out.print("Learned classification tree model:\n" + model); + + // Train a DecisionTree model for regression. + impurity = "variance"; + + final DecisionTreeModel regressionModel = DecisionTree.trainRegressor(data, + categoricalFeaturesInfo, impurity, maxDepth, maxBins); + + // Evaluate model on training instances and compute training error + JavaPairRDD regressorPredictionAndLabel = + data.mapToPair(new PairFunction() { + @Override public Tuple2 call(LabeledPoint p) { + return new Tuple2(regressionModel.predict(p.features()), p.label()); + } + }); + Double trainMSE = + regressorPredictionAndLabel.map(new Function, Double>() { + @Override public Double call(Tuple2 pl) { + Double diff = pl._1() - pl._2(); + return diff * diff; + } + }).reduce(new Function2() { + @Override public Double call(Double a, Double b) { + return a + b; + } + }) / data.count(); + System.out.print("Training Mean Squared Error: " + trainMSE); + System.out.print("Learned regression tree model:\n" + regressionModel); + + sc.stop(); + } +} From b9bee04d8ef538912b54a739f47ceeb11e4582b5 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 20 Aug 2014 11:34:06 -0700 Subject: [PATCH 3/6] Updated DT examples --- docs/mllib-decision-tree.md | 73 ++++++++++++++----- .../examples/mllib/JavaDecisionTree.java | 25 ++++--- 2 files changed, 68 insertions(+), 30 deletions(-) diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index e55db0ab0dea..e4f77e890b67 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -162,47 +162,64 @@ val labelAndPreds = data.map { point => } val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / data.count println("Training Error = " + trainErr) +println("Learned classification tree model:\n" + model) {% endhighlight %}
{% highlight java %} +import java.util.HashMap; import scala.Tuple2; +import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.DecisionTree; import org.apache.spark.mllib.tree.model.DecisionTreeModel; - -JavaRDD data = ... // data set - -// Train a DecisionTree model. +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.SparkConf; + +SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTree"); +JavaSparkContext sc = new JavaSparkContext(sparkConf); + +String datapath = "data/mllib/sample_libsvm_data.txt"; +JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD(); +// Compute the number of classes from the data. +Integer numClasses = data.map(new Function() { + @Override public Double call(LabeledPoint p) { + return p.label(); + } +}).countByValue().size(); + +// Set parameters. // Empty categoricalFeaturesInfo indicates all features are continuous. -Integer numClasses = ... // number of classes HashMap categoricalFeaturesInfo = new HashMap(); String impurity = "gini"; Integer maxDepth = 5; Integer maxBins = 100; +// Train a DecisionTree model for classification. final DecisionTreeModel model = DecisionTree.trainClassifier(data, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins); // Evaluate model on training instances and compute training error -JavaPairRDD predictionAndLabel = +JavaPairRDD predictionAndLabel = data.mapToPair(new PairFunction() { @Override public Tuple2 call(LabeledPoint p) { return new Tuple2(model.predict(p.features()), p.label()); } }); -Double trainErr = 1.0 * predictionAndLabel.filter(new Function, Boolean>() { +Double trainErr = + 1.0 * predictionAndLabel.filter(new Function, Boolean>() { @Override public Boolean call(Tuple2 pl) { return !pl._1().equals(pl._2()); } }).count() / data.count(); -System.out.print("Training error: " + trainErr); -System.out.print("Learned model:\n" + model); +System.out.println("Training error: " + trainErr); +System.out.println("Learned classification tree model:\n" + model); {% endhighlight %}
@@ -225,6 +242,8 @@ predictions = model.predict(data.map(lambda x: x.features)) labelsAndPredictions = data.map(lambda lp: lp.label).zip(predictions) trainErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() / float(data.count()) print('Training Error = ' + str(trainErr)) +print('Learned classification tree model:') +print(model) {% endhighlight %} Note: When making predictions for a dataset, it is more efficient to do batch prediction rather @@ -268,47 +287,63 @@ val labelsAndPredictions = data.map { point => } val trainMSE = labelsAndPredictions.map{ case(v, p) => math.pow((v - p), 2)}.mean() println("Training Mean Squared Error = " + trainMSE) +println("Learned regression tree model:\n" + model) {% endhighlight %}
{% highlight java %} +import java.util.HashMap; +import scala.Tuple2; +import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.tree.DecisionTree; import org.apache.spark.mllib.tree.model.DecisionTreeModel; -import scala.Tuple2; +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.SparkConf; -JavaRDD data = ... // data set +String datapath = "data/mllib/sample_libsvm_data.txt"; +JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD(); -// Train a DecisionTree model. +SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTree"); +JavaSparkContext sc = new JavaSparkContext(sparkConf); + +// Set parameters. // Empty categoricalFeaturesInfo indicates all features are continuous. HashMap categoricalFeaturesInfo = new HashMap(); String impurity = "variance"; Integer maxDepth = 5; Integer maxBins = 100; +// Train a DecisionTree model. final DecisionTreeModel model = DecisionTree.trainRegressor(data, categoricalFeaturesInfo, impurity, maxDepth, maxBins); // Evaluate model on training instances and compute training error -JavaPairRDD predictionAndLabel = +JavaPairRDD predictionAndLabel = data.mapToPair(new PairFunction() { @Override public Tuple2 call(LabeledPoint p) { return new Tuple2(model.predict(p.features()), p.label()); } }); -Double trainMSE = predictionAndLabel.map(new Function, Double>() { +Double trainMSE = + predictionAndLabel.map(new Function, Double>() { @Override public Double call(Tuple2 pl) { - Double diff = pl._1() - pl._2(); + Double diff = pl._1() - pl._2(); return diff * diff; } - }).sum() / data.count(); -System.out.print("Training Mean Squared Error: " + trainMSE); -System.out.print("Learned model:\n" + model); + }).reduce(new Function2() { + @Override public Double call(Double a, Double b) { + return a + b; + } + }) / data.count(); +System.out.println("Training Mean Squared Error: " + trainMSE); +System.out.println("Learned regression tree model:\n" + model); {% endhighlight %}
@@ -331,6 +366,8 @@ predictions = model.predict(data.map(lambda x: x.features)) labelsAndPredictions = data.map(lambda lp: lp.label).zip(predictions) trainMSE = labelsAndPredictions.map(lambda (v, p): (v - p) * (v - p)).sum() / float(data.count()) print('Training Mean Squared Error = ' + str(trainMSE)) +print('Learned regression tree model:') +print(model) {% endhighlight %} Note: When making predictions for a dataset, it is more efficient to do batch prediction rather diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java index a70d5f40b525..ee79946ec6f3 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java @@ -19,11 +19,10 @@ import java.util.HashMap; -import scala.reflect.ClassTag; import scala.Tuple2; import org.apache.spark.api.java.function.Function2; - import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; @@ -34,22 +33,23 @@ import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.SparkConf; - /** * Classification and regression using decision trees. */ public final class JavaDecisionTree { public static void main(String[] args) { - if (args.length != 1) { + String datapath = "data/mllib/sample_libsvm_data.txt"; + if (args.length == 1) { + datapath = args[0]; + } else if (args.length > 1) { System.err.println("Usage: JavaDecisionTree "); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTree"); JavaSparkContext sc = new JavaSparkContext(sparkConf); - String datapath = args[0]; - JavaRDD data = JavaRDD.fromRDD(MLUtils.loadLibSVMFile(sc.sc(), datapath)); + JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD(); // Compute the number of classes from the data. Integer numClasses = data.map(new Function() { @@ -57,7 +57,9 @@ public static void main(String[] args) { return p.label(); } }).countByValue().size(); - // Empty categoricalFeaturesInfo indicates all features are continuous. + + // Set parameters. + // Empty categoricalFeaturesInfo indicates all features are continuous. HashMap categoricalFeaturesInfo = new HashMap(); String impurity = "gini"; Integer maxDepth = 5; @@ -80,12 +82,11 @@ public static void main(String[] args) { return !pl._1().equals(pl._2()); } }).count() / data.count(); - System.out.print("Training error: " + trainErr); - System.out.print("Learned classification tree model:\n" + model); + System.out.println("Training error: " + trainErr); + System.out.println("Learned classification tree model:\n" + model); // Train a DecisionTree model for regression. impurity = "variance"; - final DecisionTreeModel regressionModel = DecisionTree.trainRegressor(data, categoricalFeaturesInfo, impurity, maxDepth, maxBins); @@ -107,8 +108,8 @@ public static void main(String[] args) { return a + b; } }) / data.count(); - System.out.print("Training Mean Squared Error: " + trainMSE); - System.out.print("Learned regression tree model:\n" + regressionModel); + System.out.println("Training Mean Squared Error: " + trainMSE); + System.out.println("Learned regression tree model:\n" + regressionModel); sc.stop(); } From d80236916413f5102654280fec131530674774b6 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 20 Aug 2014 14:17:17 -0700 Subject: [PATCH 4/6] Updates based on comments: cache data, corrected doc text. --- docs/mllib-decision-tree.md | 12 ++++++++---- .../spark/examples/mllib/JavaDecisionTree.java | 2 +- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index e4f77e890b67..699790bb2f57 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -130,7 +130,9 @@ subsequent level are split into smaller tasks. ### Classification -The example below demonstrates how to load a CSV file, parse it as an RDD of `LabeledPoint` and then +The example below demonstrates how to load a +[LIBSVM data file](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/), +parse it as an RDD of `LabeledPoint` and then perform classification using a decision tree using Gini impurity as an impurity measure and a maximum tree depth of 5. The training error is calculated to measure the algorithm accuracy. @@ -186,7 +188,7 @@ SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTree"); JavaSparkContext sc = new JavaSparkContext(sparkConf); String datapath = "data/mllib/sample_libsvm_data.txt"; -JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD(); +JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD().cache(); // Compute the number of classes from the data. Integer numClasses = data.map(new Function() { @Override public Double call(LabeledPoint p) { @@ -255,7 +257,9 @@ to an underlying `DecisionTree` model in Scala. ### Regression -The example below demonstrates how to load a CSV file, parse it as an RDD of `LabeledPoint` and then +The example below demonstrates how to load a +[LIBSVM data file](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/), +parse it as an RDD of `LabeledPoint` and then perform regression using a decision tree using variance as an impurity measure and a maximum tree depth of 5. The Mean Squared Error (MSE) is computed at the end to evaluate [goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit). @@ -308,7 +312,7 @@ import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.SparkConf; String datapath = "data/mllib/sample_libsvm_data.txt"; -JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD(); +JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD().cache(); SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTree"); JavaSparkContext sc = new JavaSparkContext(sparkConf); diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java index ee79946ec6f3..e4468e8bf174 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTree.java @@ -49,7 +49,7 @@ public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTree"); JavaSparkContext sc = new JavaSparkContext(sparkConf); - JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD(); + JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD().cache(); // Compute the number of classes from the data. Integer numClasses = data.map(new Function() { From 9dd1b6b6edd11035d081c425b2cc1af06a2d8442 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 20 Aug 2014 14:54:55 -0700 Subject: [PATCH 5/6] Updated decision tree doc. --- docs/mllib-decision-tree.md | 68 +++++++++++++++++++++++-------------- 1 file changed, 43 insertions(+), 25 deletions(-) diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index 699790bb2f57..125a7653f1c6 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -12,7 +12,7 @@ and their ensembles are popular methods for the machine learning tasks of classification and regression. Decision trees are widely used since they are easy to interpret, handle categorical features, extend to the multiclass classification setting, do not require feature scaling and are able to capture nonlinearities and feature interactions. Tree ensemble -algorithms such as decision forests and boosting are among the top performers for classification and +algorithms such as random forests and boosting are among the top performers for classification and regression tasks. MLlib supports decision trees for binary and multiclass classification and for regression, @@ -94,13 +94,13 @@ Section 9.2.4 in details). For example, for a binary classification problem with one categorical feature with three categories A, B and C whose corresponding proportions of label 1 are 0.2, 0.6 and 0.4, the categorical features are ordered as A, C, B. The two split candidates are A \| C, B -and A , C \| B where \| denotes the split. A similar heuristic is used for multiclass classification -when `$2^{M-1}-1$` is greater than the `maxBins` parameter: the impurity for each categorical feature value -is used for ordering. In multiclass classification, all `$2^{M-1}-1$` possible splits are used -whenever possible. +and A , C \| B where \| denotes the split. -Note that the `maxBins` parameter must be at least `$M_{max}$`, the maximum number of categories for -any categorical feature. +In multiclass classification, all `$2^{M-1}-1$` possible splits are used whenever possible. +When `$2^{M-1}-1$` is greater than the `maxBins` parameter, we use a (heuristic) method +similar to the method used for binary classification and regression. +The `$M$` categorical feature values are ordered by impurity, +and the resulting `$M-1$` split candidates are considered. ### Stopping rule @@ -109,6 +109,8 @@ The recursive tree construction is stopped at a node when one of the two conditi 1. The node depth is equal to the `maxDepth` training parameter. 2. No split candidate leads to an information gain at the node. +## Implementation details + ### Max memory requirements For faster processing, the decision tree algorithm performs simultaneous histogram computations for @@ -120,11 +122,24 @@ be 128 MB to allow the decision algorithm to work in most scenarios. Once the me for a level-wise computation cross the `maxMemoryInMB` threshold, the node training tasks at each subsequent level are split into smaller tasks. -### Practical limitations +Note that, if you have a large amount of memory, increasing `maxMemoryInMB` can lead to faster +training by requiring fewer passes over the data. + +### Binning feature values + +Increasing `maxBins` allows the algorithm to consider more split candidates and make fine-grained +split decisions. However, it also increases computation and communication. + +Note that the `maxBins` parameter must be at least the maximum number of categories `$M$` for +any categorical feature. + +### Scaling + +Computation scales approximately linearly in the number of training instances, +in the number of features, and in the `maxBins` parameter. +Communication scales approximately linearly in the number of features and in `maxBins`. -1. The implemented algorithm reads both sparse and dense data. However, it is not optimized for sparse input. -2. Computation scales approximately linearly in the number of training instances, - in the number of features, and in the `maxBins` parameter. +The implemented algorithm reads both sparse and dense data. However, it is not optimized for sparse input. ## Examples @@ -143,8 +158,9 @@ maximum tree depth of 5. The training error is calculated to measure the algorit import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.util.MLUtils -// Load and parse the data file -val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") +// Load and parse the data file. +// Cache the data since we will use it again to compute training error. +val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").cache() // Train a DecisionTree model. // Empty categoricalFeaturesInfo indicates all features are continuous. @@ -187,17 +203,14 @@ import org.apache.spark.SparkConf; SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTree"); JavaSparkContext sc = new JavaSparkContext(sparkConf); +// Load and parse the data file. +// Cache the data since we will use it again to compute training error. String datapath = "data/mllib/sample_libsvm_data.txt"; JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD().cache(); -// Compute the number of classes from the data. -Integer numClasses = data.map(new Function() { - @Override public Double call(LabeledPoint p) { - return p.label(); - } -}).countByValue().size(); // Set parameters. // Empty categoricalFeaturesInfo indicates all features are continuous. +Integer numClasses = 2; HashMap categoricalFeaturesInfo = new HashMap(); String impurity = "gini"; Integer maxDepth = 5; @@ -231,8 +244,9 @@ from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.tree import DecisionTree from pyspark.mllib.util import MLUtils -# an RDD of LabeledPoint -data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') +# Load and parse the data file into an RDD of LabeledPoint. +# Cache the data since we will use it again to compute training error. +data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt').cache() # Train a DecisionTree model. # Empty categoricalFeaturesInfo indicates all features are continuous. @@ -271,8 +285,9 @@ depth of 5. The Mean Squared Error (MSE) is computed at the end to evaluate import org.apache.spark.mllib.tree.DecisionTree import org.apache.spark.mllib.util.MLUtils -// Load and parse the data file -val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") +// Load and parse the data file. +// Cache the data since we will use it again to compute training error. +val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").cache() // Train a DecisionTree model. // Empty categoricalFeaturesInfo indicates all features are continuous. @@ -311,6 +326,8 @@ import org.apache.spark.mllib.tree.model.DecisionTreeModel; import org.apache.spark.mllib.util.MLUtils; import org.apache.spark.SparkConf; +// Load and parse the data file. +// Cache the data since we will use it again to compute training error. String datapath = "data/mllib/sample_libsvm_data.txt"; JavaRDD data = MLUtils.loadLibSVMFile(sc.sc(), datapath).toJavaRDD().cache(); @@ -357,8 +374,9 @@ from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.tree import DecisionTree from pyspark.mllib.util import MLUtils -# an RDD of LabeledPoint -data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt') +# Load and parse the data file into an RDD of LabeledPoint. +# Cache the data since we will use it again to compute training error. +data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt').cache() # Train a DecisionTree model. # Empty categoricalFeaturesInfo indicates all features are continuous. From 2dd2c191233be76b445683fa8aa65fa1cc426b37 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Wed, 20 Aug 2014 17:06:31 -0700 Subject: [PATCH 6/6] Last updates based on github review. --- docs/mllib-decision-tree.md | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md index 125a7653f1c6..1166d9cd150c 100644 --- a/docs/mllib-decision-tree.md +++ b/docs/mllib-decision-tree.md @@ -86,12 +86,11 @@ bins if the condition is not satisfied. **Categorical features** For a categorical feature with `$M$` possible values (categories), one could come up with -`$2^{M-1}-1$` split candidates. For binary classification and regression, +`$2^{M-1}-1$` split candidates. For binary (0/1) classification and regression, we can reduce the number of split candidates to `$M-1$` by ordering the -categorical feature values by the proportion of labels falling in one of the two classes (see -Section 9.2.4 in +categorical feature values by the average label. (See Section 9.2.4 in [Elements of Statistical Machine Learning](http://statweb.stanford.edu/~tibs/ElemStatLearn/) for -details). For example, for a binary classification problem with one categorical feature with three +details.) For example, for a binary classification problem with one categorical feature with three categories A, B and C whose corresponding proportions of label 1 are 0.2, 0.6 and 0.4, the categorical features are ordered as A, C, B. The two split candidates are A \| C, B and A , C \| B where \| denotes the split. @@ -115,7 +114,7 @@ The recursive tree construction is stopped at a node when one of the two conditi For faster processing, the decision tree algorithm performs simultaneous histogram computations for all nodes at each level of the tree. This could lead to high memory requirements at deeper levels -of the tree, leading to memory overflow errors. To alleviate this problem, a `maxMemoryInMB` +of the tree, potentially leading to memory overflow errors. To alleviate this problem, a `maxMemoryInMB` training parameter specifies the maximum amount of memory at the workers (twice as much at the master) to be allocated to the histogram computation. The default value is conservatively chosen to be 128 MB to allow the decision algorithm to work in most scenarios. Once the memory requirements @@ -148,7 +147,7 @@ The implemented algorithm reads both sparse and dense data. However, it is not o The example below demonstrates how to load a [LIBSVM data file](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/), parse it as an RDD of `LabeledPoint` and then -perform classification using a decision tree using Gini impurity as an impurity measure and a +perform classification using a decision tree with Gini impurity as an impurity measure and a maximum tree depth of 5. The training error is calculated to measure the algorithm accuracy.
@@ -274,7 +273,7 @@ to an underlying `DecisionTree` model in Scala. The example below demonstrates how to load a [LIBSVM data file](http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/), parse it as an RDD of `LabeledPoint` and then -perform regression using a decision tree using variance as an impurity measure and a maximum tree +perform regression using a decision tree with variance as an impurity measure and a maximum tree depth of 5. The Mean Squared Error (MSE) is computed at the end to evaluate [goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).