@@ -46,13 +46,25 @@ import org.apache.spark.rdd.RDD
4646 * partition, we can collect and operate locally. Locally, we can now adjust each distance by the
4747 * appropriate constant (the cumulative sum of number of elements in the prior partitions divided by
4848 * thedata set size). Finally, we take the maximum absolute value, and this is the statistic.
49+ *
50+ * In the case of the 2-sample variant, the approach is slightly different. We calculate 2
51+ * empirical CDFs corresponding to the distribution under sample 1 and under sample 2. Within each
52+ * partition, we can calculate the maximum difference of the local empirical CDFs, which is off from
53+ * the global value by some constant. Similarly to the 1-sample variant, we can simply adjust this
54+ * difference once we have collected the possible candidate extrema. However, in this case we don't
55+ * collect the number of elements in a partition, but rather an adjustment constant, that we can
56+ * cumulatively sum once we've collected results on the driver, and that when divided by
57+ * |sample 1| * |sample 2| provides the adjustment necessary to the difference between the 2
58+ * empirical CDFs in a given partition and thus the adjustment necessary to the potential extrema
59+ * candidates. The constant that we collect per partition thus corresponds to
60+ * |sample 2| * |sample 1 in partition| - |sample 1| * |sample 2 in partition|.
4961 */
5062private [stat] object KolmogorovSmirnovTest extends Logging {
5163
5264 // Null hypothesis for the type of KS test to be included in the result.
5365 object NullHypothesis extends Enumeration {
5466 type NullHypothesis = Value
55- val OneSampleTwoSided = Value (" Sample follows theoretical distribution" )
67+ val OneSampleTwoSided = Value (" Sample follows the theoretical distribution" )
5668 val TwoSampleTwoSided = Value (" Both samples follow the same distribution" )
5769 }
5870
@@ -218,7 +230,8 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
218230
219231 /**
220232 * Calculates maximum distance candidates and counts of elements from each sample within one
221- * partition for the two-sample, two-sided Kolmogorov-Smirnov test implementation
233+ * partition for the two-sample, two-sided Kolmogorov-Smirnov test implementation. Function
234+ * is package private for testing convenience.
222235 * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs,
223236 * each element is additionally tagged with a boolean flag for sample 1 membership
224237 * @param n1 `Double` sample 1 size
@@ -235,24 +248,27 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
235248 * portion that is attributable to each partition so that following partitions can
236249 * use it to cumulatively adjust their values.
237250 */
238- private def searchTwoSampleCandidates (
251+ private [stat] def searchTwoSampleCandidates (
239252 partData : Iterator [(Double , Boolean )],
240253 n1 : Double ,
241254 n2 : Double ): Iterator [(Double , Double , Double )] = {
242255 // fold accumulator: local minimum, local maximum, index for sample 1, index for sample2
243- case class ExtremaAndIndices (min : Double , max : Double , ix1 : Int , ix2 : Int )
244- val initAcc = ExtremaAndIndices (Double .MaxValue , Double .MinValue , 0 , 0 )
256+ case class ExtremaAndRunningIndices (min : Double , max : Double , ix1 : Int , ix2 : Int )
257+ val initAcc = ExtremaAndRunningIndices (Double .MaxValue , Double .MinValue , 0 , 0 )
245258 // traverse the data in the partition and calculate distances and counts
246259 val pResults = partData.foldLeft(initAcc) { case (acc, (v, isSample1)) =>
247260 val (add1, add2) = if (isSample1) (1 , 0 ) else (0 , 1 )
248261 val cdf1 = (acc.ix1 + add1) / n1
249262 val cdf2 = (acc.ix2 + add2) / n2
250263 val dist = cdf1 - cdf2
251- ExtremaAndIndices (
264+ ExtremaAndRunningIndices (
252265 math.min(acc.min, dist),
253266 math.max(acc.max, dist),
254- acc.ix1 + add1, acc.ix2 + add2)
267+ acc.ix1 + add1, acc.ix2 + add2
268+ )
255269 }
270+ // If partition has no data, then pResults will match the fold accumulator
271+ // we must filter this out to avoid having the statistic spoiled by the accumulation values
256272 val results = if (pResults == initAcc) {
257273 Array [(Double , Double , Double )]()
258274 } else {
@@ -263,14 +279,15 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
263279
264280 /**
265281 * Adjust candidate extremes by the appropriate constant. The resulting maximum corresponds to
266- * the two-sample, two-sided Kolmogorov-Smirnov test
282+ * the two-sample, two-sided Kolmogorov-Smirnov test. Function is package private for testing
283+ * convenience.
267284 * @param localData `Array[(Double, Double, Double)]` contains the candidate extremes from each
268285 * partition, along with the numerator for the necessary constant adjustments
269286 * @param n `Double` The denominator in the constant adjustment (i.e. (size of sample 1 ) * (size
270287 * of sample 2))
271288 * @return The two-sample, two-sided Kolmogorov-Smirnov statistic
272289 */
273- private def searchTwoSampleStatistic (localData : Array [(Double , Double , Double )], n : Double )
290+ private [stat] def searchTwoSampleStatistic (localData : Array [(Double , Double , Double )], n : Double )
274291 : Double = {
275292 // maximum distance and numerator for constant adjustment
276293 val initAcc = (Double .MinValue , 0.0 )
@@ -282,7 +299,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging {
282299 val dist2 = math.abs(maxCand + adjConst)
283300 val maxVal = Array (prevMax, dist1, dist2).max
284301 (maxVal, prevCt + ct)
285- }
302+ }
286303 results._1
287304 }
288305
0 commit comments