-
Notifications
You must be signed in to change notification settings - Fork 29k
SPARK-1941: Update streamlib to 2.7.0 and use HyperLogLogPlus instead of HyperLogLog. #897
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
c0ef0c2
e7786cb
1294be6
88cfe77
9221b27
1db1522
6555bfe
acaa524
354deb8
e110d70
9e320c8
41e649a
e367527
f154ea0
4d83f41
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,7 +28,7 @@ import scala.collection.mutable | |
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.reflect.ClassTag | ||
|
|
||
| import com.clearspring.analytics.stream.cardinality.HyperLogLog | ||
| import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus | ||
| import org.apache.hadoop.conf.{Configurable, Configuration} | ||
| import org.apache.hadoop.fs.FileSystem | ||
| import org.apache.hadoop.io.SequenceFile.CompressionType | ||
|
|
@@ -46,7 +46,6 @@ import org.apache.spark.Partitioner.defaultPartitioner | |
| import org.apache.spark.SparkContext._ | ||
| import org.apache.spark.partial.{BoundedDouble, PartialResult} | ||
| import org.apache.spark.serializer.Serializer | ||
| import org.apache.spark.util.SerializableHyperLogLog | ||
|
|
||
| /** | ||
| * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. | ||
|
|
@@ -214,39 +213,88 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) | |
| } | ||
|
|
||
| /** | ||
| * :: Experimental :: | ||
| * | ||
| * Return approximate number of distinct values for each key in this RDD. | ||
| * The accuracy of approximation can be controlled through the relative standard deviation | ||
| * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
| * more accurate counts but increase the memory footprint and vice versa. Uses the provided | ||
| * Partitioner to partition the output RDD. | ||
| * | ||
| * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
| * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available | ||
| * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. | ||
| * | ||
| * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` | ||
| * would trigger sparse representation of registers, which may reduce the memory consumption | ||
| * and increase accuracy when the cardinality is small. | ||
| * | ||
| *@param p The precision value for the normal set. | ||
| * `p` must be a value between 4 and `sp` (32 max). | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add "if |
||
| * @param sp The precision value for the sparse set, between 0 and 32. | ||
| * If `sp` equals 0, the sparse representation is skipped. | ||
| * @param partitioner Partitioner to use for the resulting RDD. | ||
| */ | ||
| def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { | ||
| val createHLL = (v: V) => new SerializableHyperLogLog(new HyperLogLog(relativeSD)).add(v) | ||
| val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => hll.add(v) | ||
| val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2) | ||
| @Experimental | ||
| def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { | ||
| require(p >= 4, s"p ($p) should be >= 4") | ||
| require(sp <= 32, s"sp ($sp) should be <= 32") | ||
| require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") | ||
| val createHLL = (v: V) => { | ||
| val hll = new HyperLogLogPlus(p, sp) | ||
| hll.offer(v) | ||
| hll | ||
| } | ||
| val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => { | ||
| hll.offer(v) | ||
| hll | ||
| } | ||
| val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { | ||
| h1.addAll(h2) | ||
| h1 | ||
| } | ||
|
|
||
| combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality()) | ||
| } | ||
|
|
||
| combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.value.cardinality()) | ||
| /** | ||
| * Return approximate number of distinct values for each key in this RDD. | ||
| * | ||
| * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
| * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available | ||
| * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. | ||
| * | ||
| * @param relativeSD Relative accuracy. Smaller values create counters that require more space. | ||
| * It should be greater than 0.000017. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "should" -> "must"? |
||
| * @param partitioner partitioner of the resulting RDD | ||
| */ | ||
| def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { | ||
| require(relativeSD > 0.000017, s"accuracy ($relativeSD) should be greater than 0.000017") | ||
| val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt | ||
| assert(p <= 32) | ||
| countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner) | ||
| } | ||
|
|
||
| /** | ||
| * Return approximate number of distinct values for each key in this RDD. | ||
| * The accuracy of approximation can be controlled through the relative standard deviation | ||
| * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
| * more accurate counts but increase the memory footprint and vice versa. HashPartitions the | ||
| * output RDD into numPartitions. | ||
| * | ||
| * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
| * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available | ||
| * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. | ||
| * | ||
| * @param relativeSD Relative accuracy. Smaller values create counters that require more space. | ||
| * It should be greater than 0.000017. | ||
| * @param numPartitions number of partitions of the resulting RDD | ||
| */ | ||
| def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { | ||
| countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) | ||
| } | ||
|
|
||
| /** | ||
| * Return approximate number of distinct values for each key this RDD. | ||
| * The accuracy of approximation can be controlled through the relative standard deviation | ||
| * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
| * more accurate counts but increase the memory footprint and vice versa. The default value of | ||
| * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism | ||
| * level. | ||
| * Return approximate number of distinct values for each key in this RDD. | ||
| * | ||
| * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
| * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available | ||
| * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. | ||
| * | ||
| * @param relativeSD Relative accuracy. Smaller values create counters that require more space. | ||
| * It should be greater than 0.000017. | ||
| */ | ||
| def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { | ||
| countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,12 +19,11 @@ package org.apache.spark.rdd | |
|
|
||
| import java.util.Random | ||
|
|
||
| import scala.collection.Map | ||
| import scala.collection.mutable | ||
| import scala.collection.{mutable, Map} | ||
| import scala.collection.mutable.ArrayBuffer | ||
| import scala.reflect.{classTag, ClassTag} | ||
|
|
||
| import com.clearspring.analytics.stream.cardinality.HyperLogLog | ||
| import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus | ||
| import org.apache.hadoop.io.BytesWritable | ||
| import org.apache.hadoop.io.compress.CompressionCodec | ||
| import org.apache.hadoop.io.NullWritable | ||
|
|
@@ -41,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator | |
| import org.apache.spark.partial.GroupedCountEvaluator | ||
| import org.apache.spark.partial.PartialResult | ||
| import org.apache.spark.storage.StorageLevel | ||
| import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils} | ||
| import org.apache.spark.util.{BoundedPriorityQueue, Utils} | ||
| import org.apache.spark.util.collection.OpenHashMap | ||
| import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler} | ||
|
|
||
|
|
@@ -921,15 +920,48 @@ abstract class RDD[T: ClassTag]( | |
| * :: Experimental :: | ||
| * Return approximate number of distinct elements in the RDD. | ||
| * | ||
| * The accuracy of approximation can be controlled through the relative standard deviation | ||
| * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in | ||
| * more accurate counts but increase the memory footprint and vise versa. The default value of | ||
| * relativeSD is 0.05. | ||
| * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
| * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available | ||
| * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. | ||
| * | ||
| * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` | ||
| * would trigger sparse representation of registers, which may reduce the memory consumption | ||
| * and increase accuracy when the cardinality is small. | ||
| * | ||
| * @param p The precision value for the normal set. | ||
| * <code>p</code> must be a value between 4 and <code>sp</code>. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let us put the following into the doc. The relative standard error is approximately |
||
| * @param sp The precision value for the sparse set, between 0 and 32. | ||
| * If <code>sp</code> equals 0, the sparse representation is skipped. | ||
| */ | ||
| @Experimental | ||
| def countApproxDistinct(p: Int, sp: Int): Long = { | ||
| require(p >= 4, s"p ($p) must be greater than 0") | ||
| require(sp <= 32, s"sp ($sp) cannot be greater than 32") | ||
| require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") | ||
| val zeroCounter = new HyperLogLogPlus(p, sp) | ||
| aggregate(zeroCounter)( | ||
| (hll: HyperLogLogPlus, v: T) => { | ||
| hll.offer(v) | ||
| hll | ||
| }, | ||
| (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { | ||
| h1.addAll(h2) | ||
| h2 | ||
| }).cardinality() | ||
| } | ||
|
|
||
| /** | ||
| * Return approximate number of distinct elements in the RDD. | ||
| * | ||
| * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: | ||
| * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available | ||
| * <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>. | ||
| * | ||
| * @param relativeSD Relative accuracy. Smaller values create counters that require more space. | ||
| */ | ||
| def countApproxDistinct(relativeSD: Double = 0.05): Long = { | ||
| val zeroCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD)) | ||
| aggregate(zeroCounter)(_.add(_), _.merge(_)).value.cardinality() | ||
| val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt | ||
| countApproxDistinct(p, 0) | ||
| } | ||
|
|
||
| /** | ||
|
|
||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I changed the return type from JavaRDD[(K, Long)] to JavaPairRDD[K, Long], because that is what it should've been.
However, in order to maintain complete API stability, I can change it back and just deprecated the old methods. The new methods certainly should return JavaPairRDD.