diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index c188b4c70ffbd..61ffe31edfd04 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -61,19 +61,23 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int // Splits offset ranges with relatively large amount of data to smaller ones. val totalSize = offsetRanges.map(_.size).sum - val idealRangeSize = totalSize.toDouble / minPartitions.get - offsetRanges.flatMap { range => - // Split the current range into subranges as close to the ideal range size - val numSplitsInRange = math.round(range.size.toDouble / idealRangeSize).toInt - - (0 until numSplitsInRange).map { i => - val splitStart = range.fromOffset + range.size * (i.toDouble / numSplitsInRange) - val splitEnd = range.fromOffset + range.size * ((i.toDouble + 1) / numSplitsInRange) - KafkaOffsetRange( - range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None) + val tp = range.topicPartition + val size = range.size + // number of partitions to divvy up this topic partition to + val parts = math.max(math.round(size.toDouble / totalSize * minPartitions.get), 1).toInt + var remaining = size + var startOffset = range.fromOffset + (0 until parts).map { part => + // Fine to do integer division. Last partition will consume all the round off errors + val thisPartition = remaining / (parts - part) + remaining -= thisPartition + val endOffset = math.min(startOffset + thisPartition, range.untilOffset) + val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, None) + startOffset = endOffset + offsetRange } - } + }.filter(_.size > 0) } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala index 7ffdaab3e74fb..2374a817422fa 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala @@ -141,6 +141,21 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { KafkaOffsetRange(tp2, 14, 21, None))) } + testWithMinPartitions("SPARK-28489: never drop offsets", 6) { calc => + assert( + calc.getRanges( + fromOffsets = Map(tp1 -> 0, tp2 -> 0, tp3 -> 0), + untilOffsets = Map(tp1 -> 10, tp2 -> 10, tp3 -> 1)) == + Seq( + KafkaOffsetRange(tp1, 0, 3, None), + KafkaOffsetRange(tp1, 3, 6, None), + KafkaOffsetRange(tp1, 6, 10, None), + KafkaOffsetRange(tp2, 0, 3, None), + KafkaOffsetRange(tp2, 3, 6, None), + KafkaOffsetRange(tp2, 6, 10, None), + KafkaOffsetRange(tp3, 0, 1, None))) + } + private val tp1 = new TopicPartition("t1", 1) private val tp2 = new TopicPartition("t2", 1) private val tp3 = new TopicPartition("t3", 1)