Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.io.SequenceFile.CompressionType
Expand All @@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.SerializableHyperLogLog

/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
Expand Down Expand Up @@ -218,14 +217,29 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vice versa. Uses the provided
* Partitioner to partition the output RDD.
* [[Partitioner]] to partition the output RDD.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*/
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v)
val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v)
val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does this magic value of 1.106 come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not even sure if the math is correct yet. Will update the PR once I confirm it.

val createHLL = (v: V) => {
val hll = new HyperLogLogPlus(precision)
hll.offer(v)
hll
}
val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => {
hll.offer(v)
hll
}
val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
h1
}

combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality())
combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
}

/**
Expand Down
21 changes: 17 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
Expand All @@ -41,7 +41,7 @@ import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}

Expand Down Expand Up @@ -925,11 +925,24 @@ abstract class RDD[T: ClassTag](
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. The default value of
* relativeSD is 0.05.
*
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available at
* [[http://research.google.com/pubs/pub40671.html]].
*/
@Experimental
def countApproxDistinct(relativeSD: Double = 0.05): Long = {
val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality()
val precision = (math.log((1.106 / relativeSD) * (1.106 / relativeSD)) / math.log(2)).toInt
val zeroCounter = new HyperLogLogPlus(precision)
aggregate(zeroCounter)(
(hll: HyperLogLogPlus, v: T) => {
hll.offer(v)
hll
},
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
h2
}).cardinality()
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
val rdd1 = sc.parallelize(stacked)
val counted1 = rdd1.countApproxDistinctByKey(relativeSD).collect()
counted1.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
counted1.foreach { case (k, count) => assert(error(count, k) < relativeSD) }

val rnd = new Random()

Expand All @@ -139,9 +137,7 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
}
val rdd2 = sc.parallelize(randStacked)
val counted2 = rdd2.countApproxDistinctByKey(relativeSD, 4).collect()
counted2.foreach{
case(k, count) => assert(error(count, k) < relativeSD)
}
counted2.foreach { case(k, count) => assert(error(count, k) < relativeSD) }
}

test("join") {
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@
<dependency>
<groupId>com.clearspring.analytics</groupId>
<artifactId>stream</artifactId>
<version>2.5.1</version>
<version>2.7.0</version>
<exclusions>
<!-- Only HyperLogLog is used, which doesn't depend on fastutil -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also update the comment.

<exclusion>
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ object SparkBuild extends Build {
"com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm),
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
"com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil),
"com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil),
"org.spark-project" % "pyrolite" % "2.0.1",
"net.sf.py4j" % "py4j" % "0.8.1"
),
Expand Down