Skip to content

Commit 3d05c7e

Browse files
ahmed-mahransrowen
authored andcommitted
[SPARK-41008][MLLIB] Dedup isotonic regression duplicate features
### What changes were proposed in this pull request? Adding a pre-processing step to isotonic regression in mllib to handle duplicate features. This is to match `sklearn` implementation. Input points of duplicate feature values are aggregated into a single point using as label the weighted average of the labels of the points with duplicate feature values. All points for a unique feature values are aggregated as: - Aggregated label is the weighted average of all labels - Aggregated feature is the weighted average of all equal features. It is possible that feature values to be equal up to a resolution due to representation errors, since we cannot know which feature value to use in that case, we compute the weighted average of the features. Ideally, all feature values will be equal and the weighted average is just the value at any point. - Aggregated weight is the sum of all weights ### Why are the changes needed? As per discussion on ticket [[SPARK-41008]](https://issues.apache.org/jira/browse/SPARK-41008), it is a bug and results should match `sklearn`. ### Does this PR introduce _any_ user-facing change? There are no changes to the API, documentation or error messages. However, the user should expect results to change. ### How was this patch tested? Existing test cases for duplicate features failed. These tests were adjusted accordingly. Also, new tests are added. Here is a python snippet that can be used to verify the results: ```python from sklearn.isotonic import IsotonicRegression def test(x, y, x_test, isotonic=True): ir = IsotonicRegression(out_of_bounds='clip', increasing=isotonic).fit(x, y) y_test = ir.predict(x_test) def print_array(label, a): print(f"{label}: [{', '.join([str(i) for i in a])}]") print_array("boundaries", ir.X_thresholds_) print_array("predictions", ir.y_thresholds_) print_array("y_test", y_test) test( x = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20], y = [1, 0, 0, 1, 0, 1, 0, 0, 0], x_test = [0.6, 0.6, 0.333, 0.333, 0.333, 0.20, 0.20, 0.20, 0.20] ) ``` srowen zapletal-martin Closes #38966 from ahmed-mahran/ml-isotonic-reg-dups. Authored-by: Ahmed Mahran <[email protected]> Signed-off-by: Sean Owen <[email protected]>
1 parent a3a755d commit 3d05c7e

2 files changed

Lines changed: 262 additions & 59 deletions

File tree

mllib/src/main/scala/org/apache/spark/mllib/regression/IsotonicRegression.scala

Lines changed: 118 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
1817
package org.apache.spark.mllib.regression
1918

2019
import java.io.Serializable
@@ -24,6 +23,7 @@ import java.util.Arrays.binarySearch
2423
import scala.collection.JavaConverters._
2524
import scala.collection.mutable.ArrayBuffer
2625

26+
import org.apache.commons.math3.util.Precision
2727
import org.json4s._
2828
import org.json4s.JsonDSL._
2929
import org.json4s.jackson.JsonMethods._
@@ -307,6 +307,65 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
307307
run(input.rdd.retag.asInstanceOf[RDD[(Double, Double, Double)]])
308308
}
309309

310+
/**
311+
* Aggregates points of duplicate feature values into a single point using as label the weighted
312+
* average of the labels of the points with duplicate feature values. All points for a unique
313+
* feature values are aggregated as:
314+
*
315+
* - Aggregated label is the weighted average of all labels
316+
* - Aggregated feature is the weighted average of all equal features[1]
317+
* - Aggregated weight is the sum of all weights
318+
*
319+
* [1] Note: It is possible that feature values to be equal up to a resolution due to
320+
* representation errors, since we cannot know which feature value to use in that case, we
321+
* compute the weighted average of the features. Ideally, all feature values will be equal and
322+
* the weighted average is just the value at any point.
323+
*
324+
* @param input
325+
* Input data of tuples (label, feature, weight). Weights must be non-negative.
326+
* @return
327+
* Points with unique feature values.
328+
*/
329+
private[regression] def makeUnique(
330+
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
331+
332+
val cleanInput = input.filter { case (y, x, weight) =>
333+
require(
334+
weight >= 0.0,
335+
s"Negative weight at point ($y, $x, $weight). Weights must be non-negative")
336+
weight > 0
337+
}
338+
339+
if (cleanInput.length <= 1) {
340+
cleanInput
341+
} else {
342+
// whether or not two double features are equal up to a precision
343+
@inline def areEqual(a: Double, b: Double): Boolean = Precision.equals(a, b)
344+
345+
val pointsAccumulator = new IsotonicRegression.PointsAccumulator
346+
var (_, prevFeature, _) = cleanInput.head
347+
348+
// Go through input points, merging all points with approximately equal feature values into
349+
// a single point. Equality of features is defined by areEqual method. The label of the
350+
// accumulated points is the weighted average of the labels of all points of equal feature
351+
// value. It is possible that feature values to be equal up to a resolution due to
352+
// representation errors, since we cannot know which feature value to use in that case,
353+
// we compute the weighted average of the features.
354+
cleanInput.foreach { case point @ (_, feature, _) =>
355+
if (areEqual(feature, prevFeature)) {
356+
pointsAccumulator += point
357+
} else {
358+
pointsAccumulator.appendToOutput()
359+
pointsAccumulator := point
360+
}
361+
prevFeature = feature
362+
}
363+
// Append the last accumulated point
364+
pointsAccumulator.appendToOutput()
365+
pointsAccumulator.getOutput
366+
}
367+
}
368+
310369
/**
311370
* Performs a pool adjacent violators algorithm (PAV). Implements the algorithm originally
312371
* described in [1], using the formulation from [2, 3]. Uses an array to keep track of start
@@ -322,35 +381,27 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
322381
* functions subject to simple chain constraints." SIAM Journal on Optimization 10.3 (2000):
323382
* 658-672.
324383
*
325-
* @param input Input data of tuples (label, feature, weight). Weights must
326-
be non-negative.
384+
* @param cleanUniqueInput Input data of tuples(label, feature, weight).Features must be unique
385+
* and weights must be non-negative.
327386
* @return Result tuples (label, feature, weight) where labels were updated
328387
* to form a monotone sequence as per isotonic regression definition.
329388
*/
330389
private def poolAdjacentViolators(
331-
input: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
390+
cleanUniqueInput: Array[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
332391

333-
val cleanInput = input.filter{ case (y, x, weight) =>
334-
require(
335-
weight >= 0.0,
336-
s"Negative weight at point ($y, $x, $weight). Weights must be non-negative"
337-
)
338-
weight > 0
339-
}
340-
341-
if (cleanInput.isEmpty) {
392+
if (cleanUniqueInput.isEmpty) {
342393
return Array.empty
343394
}
344395

345396
// Keeps track of the start and end indices of the blocks. if [i, j] is a valid block from
346397
// cleanInput(i) to cleanInput(j) (inclusive), then blockBounds(i) = j and blockBounds(j) = i
347398
// Initially, each data point is its own block.
348-
val blockBounds = Array.range(0, cleanInput.length)
399+
val blockBounds = Array.range(0, cleanUniqueInput.length)
349400

350401
// Keep track of the sum of weights and sum of weight * y for each block. weights(start)
351402
// gives the values for the block. Entries that are not at the start of a block
352403
// are meaningless.
353-
val weights: Array[(Double, Double)] = cleanInput.map { case (y, _, weight) =>
404+
val weights: Array[(Double, Double)] = cleanUniqueInput.map { case (y, _, weight) =>
354405
(weight, weight * y)
355406
}
356407

@@ -392,10 +443,10 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
392443
// Merge on >= instead of > because it eliminates adjacent blocks with the same average, and we
393444
// want to compress our output as much as possible. Both give correct results.
394445
var i = 0
395-
while (nextBlock(i) < cleanInput.length) {
446+
while (nextBlock(i) < cleanUniqueInput.length) {
396447
if (average(i) >= average(nextBlock(i))) {
397448
merge(i, nextBlock(i))
398-
while((i > 0) && (average(prevBlock(i)) >= average(i))) {
449+
while ((i > 0) && (average(prevBlock(i)) >= average(i))) {
399450
i = merge(prevBlock(i), i)
400451
}
401452
} else {
@@ -406,15 +457,15 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
406457
// construct the output by walking through the blocks in order
407458
val output = ArrayBuffer.empty[(Double, Double, Double)]
408459
i = 0
409-
while (i < cleanInput.length) {
460+
while (i < cleanUniqueInput.length) {
410461
// If block size is > 1, a point at the start and end of the block,
411462
// each receiving half the weight. Otherwise, a single point with
412463
// all the weight.
413-
if (cleanInput(blockEnd(i))._2 > cleanInput(i)._2) {
414-
output += ((average(i), cleanInput(i)._2, weights(i)._1 / 2))
415-
output += ((average(i), cleanInput(blockEnd(i))._2, weights(i)._1 / 2))
464+
if (cleanUniqueInput(blockEnd(i))._2 > cleanUniqueInput(i)._2) {
465+
output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1 / 2))
466+
output += ((average(i), cleanUniqueInput(blockEnd(i))._2, weights(i)._1 / 2))
416467
} else {
417-
output += ((average(i), cleanInput(i)._2, weights(i)._1))
468+
output += ((average(i), cleanUniqueInput(i)._2, weights(i)._1))
418469
}
419470
i = nextBlock(i)
420471
}
@@ -434,12 +485,56 @@ class IsotonicRegression private (private var isotonic: Boolean) extends Seriali
434485
input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
435486
val keyedInput = input.keyBy(_._2)
436487
val parallelStepResult = keyedInput
488+
// Points with same or adjacent features must collocate within the same partition.
437489
.partitionBy(new RangePartitioner(keyedInput.getNumPartitions, keyedInput))
438490
.values
491+
// Lexicographically sort points by features then labels.
439492
.mapPartitions(p => Iterator(p.toArray.sortBy(x => (x._2, x._1))))
493+
// Aggregate points with equal features into a single point.
494+
.map(makeUnique)
440495
.flatMap(poolAdjacentViolators)
441496
.collect()
442-
.sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
497+
// Sort again because collect() doesn't promise ordering.
498+
.sortBy(x => (x._2, x._1))
443499
poolAdjacentViolators(parallelStepResult)
444500
}
445501
}
502+
503+
object IsotonicRegression {
504+
/**
505+
* Utility class, holds a buffer of all points with unique features so far, and performs
506+
* weighted sum accumulation of points. Hides these details for better readability of the
507+
* main algorithm.
508+
*/
509+
class PointsAccumulator {
510+
private val output = ArrayBuffer[(Double, Double, Double)]()
511+
private var (currentLabel: Double, currentFeature: Double, currentWeight: Double) =
512+
(0d, 0d, 0d)
513+
514+
/** Resets the current value of the point accumulator using the provided point. */
515+
def :=(point: (Double, Double, Double)): Unit = {
516+
val (label, feature, weight) = point
517+
currentLabel = label * weight
518+
currentFeature = feature * weight
519+
currentWeight = weight
520+
}
521+
522+
/** Accumulates the provided point into the current value of the point accumulator. */
523+
def +=(point: (Double, Double, Double)): Unit = {
524+
val (label, feature, weight) = point
525+
currentLabel += label * weight
526+
currentFeature += feature * weight
527+
currentWeight += weight
528+
}
529+
530+
/** Appends the current value of the point accumulator to the output. */
531+
def appendToOutput(): Unit =
532+
output += ((
533+
currentLabel / currentWeight,
534+
currentFeature / currentWeight,
535+
currentWeight))
536+
537+
/** Returns all accumulated points so far. */
538+
def getOutput: Array[(Double, Double, Double)] = output.toArray
539+
}
540+
}

0 commit comments

Comments
 (0)