Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 6 additions & 17 deletions core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.partial

import org.apache.commons.math3.distribution.{PascalDistribution, PoissonDistribution}
import org.apache.commons.math3.distribution.PoissonDistribution

/**
* An ApproximateEvaluator for counts.
Expand Down Expand Up @@ -48,22 +48,11 @@ private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
private[partial] object CountEvaluator {

def bound(confidence: Double, sum: Long, p: Double): BoundedDouble = {
// Let the total count be N. A fraction p has been counted already, with sum 'sum',
// as if each element from the total data set had been seen with probability p.
val dist =
if (sum <= 10000) {
// The remaining count, k=N-sum, may be modeled as negative binomial (aka Pascal),
// where there have been 'sum' successes of probability p already. (There are several
// conventions, but this is the one followed by Commons Math3.)
new PascalDistribution(sum.toInt, p)
} else {
// For large 'sum' (certainly, > Int.MaxValue!), use a Poisson approximation, which has
// a different interpretation. "sum" elements have been observed having scanned a fraction
// p of the data. This suggests data is counted at a rate of sum / p across the whole data
// set. The total expected count from the rest is distributed as
// (1-p) Poisson(sum / p) = Poisson(sum*(1-p)/p)
new PoissonDistribution(sum * (1 - p) / p)
}
// "sum" elements have been observed having scanned a fraction
// p of the data. This suggests data is counted at a rate of sum / p across the whole data
// set. The total expected count from the rest is distributed as
// (1-p) Poisson(sum / p) = Poisson(sum*(1-p)/p)
val dist = new PoissonDistribution(sum * (1 - p) / p)
Copy link

@lovasoa lovasoa Jun 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen I know it is a little late for a review, but now that we have a single distribution, it would make the code clearer to estimate directly the total count with the poisson distribution. That is removing the 1-p here and the sum + in the final BoundedDouble.

// Not quite symmetric; calculate interval straight from discrete distribution
val low = dist.inverseCumulativeProbability((1 - confidence) / 2)
val high = dist.inverseCumulativeProbability((1 + confidence) / 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ class CountEvaluatorSuite extends SparkFunSuite {

test("test count 0") {
val evaluator = new CountEvaluator(10, 0.95)
assert(new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity) == evaluator.currentResult())
assert(evaluator.currentResult() === new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity))
evaluator.merge(1, 0)
assert(new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity) == evaluator.currentResult())
assert(evaluator.currentResult() === new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity))
}

test("test count >= 1") {
val evaluator = new CountEvaluator(10, 0.95)
evaluator.merge(1, 1)
assert(new BoundedDouble(10.0, 0.95, 1.0, 36.0) == evaluator.currentResult())
assert(evaluator.currentResult() === new BoundedDouble(10.0, 0.95, 5.0, 16.0))
evaluator.merge(1, 3)
assert(new BoundedDouble(20.0, 0.95, 7.0, 41.0) == evaluator.currentResult())
assert(evaluator.currentResult() === new BoundedDouble(20.0, 0.95, 13.0, 28.0))
evaluator.merge(1, 8)
assert(new BoundedDouble(40.0, 0.95, 24.0, 61.0) == evaluator.currentResult())
assert(evaluator.currentResult() === new BoundedDouble(40.0, 0.95, 30.0, 51.0))
(4 to 10).foreach(_ => evaluator.merge(1, 10))
assert(new BoundedDouble(82.0, 1.0, 82.0, 82.0) == evaluator.currentResult())
assert(evaluator.currentResult() === new BoundedDouble(82.0, 1.0, 82.0, 82.0))
}

}