Skip to content
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}

/**
Expand Down Expand Up @@ -104,13 +105,11 @@ class RowMatrix(
val nt: Int = n * (n + 1) / 2

// Compute the upper triangular part of the gram matrix.
val GU = rows.aggregate(new BDV[Double](new Array[Double](nt)))(
val GU = rows.treeAggregate(new BDV[Double](new Array[Double](nt)))(
seqOp = (U, v) => {
RowMatrix.dspr(1.0, v, U.data)
U
},
combOp = (U1, U2) => U1 += U2
)
}, combOp = (U1, U2) => U1 += U2, depth = 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can set depth to be 2 by default so you don't need to repeat this ...


RowMatrix.triuToFull(n, GU.data)
}
Expand Down Expand Up @@ -290,10 +289,11 @@ class RowMatrix(
s"We need at least $mem bytes of memory.")
}

val (m, mean) = rows.aggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))(
val (m, mean) = rows.treeAggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))(
seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze),
combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => (s1._1 + s2._1, s1._2 += s2._2)
)
combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) =>
(s1._1 + s2._1, s1._2 += s2._2),
depth = 2)

updateNumRows(m)

Expand Down Expand Up @@ -353,10 +353,10 @@ class RowMatrix(
* Computes column-wise summary statistics.
*/
def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = {
val summary = rows.aggregate[MultivariateOnlineSummarizer](new MultivariateOnlineSummarizer)(
val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)(
(aggregator, data) => aggregator.add(data),
(aggregator1, aggregator2) => aggregator1.merge(aggregator2)
)
(aggregator1, aggregator2) => aggregator1.merge(aggregator2),
depth = 2)
updateNumRows(summary.count)
summary
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.annotation.{Experimental, DeveloperApi}
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.rdd.RDDFunctions._

/**
* Class used to solve an optimization problem using Gradient Descent.
Expand Down Expand Up @@ -177,14 +178,14 @@ object GradientDescent extends Logging {
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
.aggregate((BDV.zeros[Double](n), 0.0))(
.treeAggregate((BDV.zeros[Double](n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
(grad1 += grad2, loss1 + loss2)
})
}, depth = 2)

/**
* NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Vectors, Vector}
import org.apache.spark.mllib.rdd.RDDFunctions._

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -199,15 +200,15 @@ object LBFGS extends Logging {
val n = weights.length
val bcWeights = data.context.broadcast(weights)

val (gradientSum, lossSum) = data.aggregate((BDV.zeros[Double](n), 0.0))(
val (gradientSum, lossSum) = data.treeAggregate((BDV.zeros[Double](n), 0.0))(
seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
val l = localGradient.compute(
features, label, Vectors.fromBreeze(bcWeights.value), Vectors.fromBreeze(grad))
(grad, loss + l)
},
combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
(grad1 += grad2, loss1 + loss2)
})
}, depth = 2)

/**
* regVal is sum of weight squares if it's L2 updater;
Expand Down
62 changes: 62 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package org.apache.spark.mllib.rdd
import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.spark.HashPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

/**
* Machine learning specific RDD functions.
Expand All @@ -44,6 +47,65 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
new SlidingRDD[T](self, windowSize)
}
}

/**
* Reduces the elements of this RDD in a tree pattern.
* @param depth suggested depth of the tree
* @see [[org.apache.spark.rdd.RDD#reduce]]
*/
def treeReduce(f: (T, T) => T, depth: Int): T = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used at all? if not maybe we don't need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR, but it is useful for testing (simpler than treeAggregate) and it could be used in the feature.

require(depth >= 1, s"Depth must be greater than 1 but got $depth.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

greater or equal to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

val cleanF = self.context.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
val local = self.mapPartitions(it => Iterator(reducePartition(it)))
val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
if (c.isDefined && x.isDefined) {
Some(cleanF(c.get, x.get))
} else if (c.isDefined) {
c
} else if (x.isDefined) {
x
} else {
None
}
}
RDDFunctions.fromRDD(local).treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

/**
* Aggregates the elements of this RDD in a tree pattern.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"in a tree pattern" -> "in a multi-level aggregation tree pattern".

* @param depth suggested depth of the tree
* @see [[org.apache.spark.rdd.RDD#aggregate]]
*/
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the convention is 4 space indent for arguments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

combOp: (U, U) => U,
depth: Int): U = {
require(depth >= 1, s"Depth must be greater than 1 but got $depth.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=, not >

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (self.partitions.size == 0) {
return Utils.clone(zeroValue, self.context.env.closureSerializer.newInstance())
}
val cleanSeqOp = self.context.clean(seqOp)
val cleanCombOp = self.context.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var local = self.mapPartitions(it => Iterator(aggregatePartition(it)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think partial or partiallyAggregated is probably a better name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to partiallyAggregated

var numPartitions = local.partitions.size
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
while (numPartitions > scale + numPartitions / scale) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to add some comments here..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also just for the sake of program always terminating, i'd set a cap on max depth (like 8).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

numPartitions /= scale
local = local.mapPartitionsWithIndex { (i, iter) =>
iter.map((i % numPartitions, _))
}.reduceByKey(new HashPartitioner(numPartitions), cleanCombOp).values
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is beyond your PR -- but @mateiz and I talked about adding some native primitive to shuffle to improve specifically this pattern (basically there is no need to create numPartitions streams within each map task).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be great ~

}
local.reduce(cleanCombOp)
}
}

private[mllib]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,22 @@ class RDDFunctionsSuite extends FunSuite with LocalSparkContext {
val expected = data.flatMap(x => x).sliding(3).toList
assert(sliding.collect().toList === expected)
}

test("treeAggregate") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
def seqOp = (c: Long, x: Int) => c + x
def combOp = (c1: Long, c2: Long) => c1 + c2
for (depth <- 1 until 10) {
val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth)
assert(sum === -1000L)
}
}

test("treeReduce") {
val rdd = sc.makeRDD(-1000 until 1000, 10)
for (depth <- 1 until 10) {
val sum = rdd.treeReduce(_ + _, depth)
assert(sum === -1000)
}
}
}