Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,23 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int

// Splits offset ranges with relatively large amount of data to smaller ones.
val totalSize = offsetRanges.map(_.size).sum
val idealRangeSize = totalSize.toDouble / minPartitions.get

offsetRanges.flatMap { range =>
// Split the current range into subranges as close to the ideal range size
val numSplitsInRange = math.round(range.size.toDouble / idealRangeSize).toInt

(0 until numSplitsInRange).map { i =>
val splitStart = range.fromOffset + range.size * (i.toDouble / numSplitsInRange)
val splitEnd = range.fromOffset + range.size * ((i.toDouble + 1) / numSplitsInRange)
KafkaOffsetRange(
range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None)
val tp = range.topicPartition
val size = range.size
// number of partitions to divvy up this topic partition to
val parts = math.max(math.round(size.toDouble / totalSize * minPartitions.get), 1).toInt
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one ensures we never drop a TopicPartition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ratio calculation looks good, but round seems to generate less partitions. Is there a reason to choose round instead of ceiling?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm seeing the same. Suppose 4 offsetRanges divide 1 partition for each 0.25, then we lost 1. The number of lost partitions may vary.

In other words, if we use ceil, it may overflow the minimum partitions, and the number of exceeding partitions may vary. We don't guarantee for this calculator to return partitions closest to minimum partitions, so it seems OK.

If we really would like to make this strict, we could apply "allocation" - calculating ratio on each offsetRange, and allocate partitions to each offsetRange according to ratio (apply minimum of 1 for safeness), and allocate extra partitions to some offsetRanges if there're remaining partitions. Not sure we would like to deal with complexity.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it's a hint. And when the number of partitions is less than minPartition, we will try our best to split. Agreed that the option name minPartition is not accurate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, could you update the document instead in a more accurate way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun I think the doc for this method is accurate :

* The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few days ago, minPartitions is added to the documentation for master/branch-2.4 via #25219 .

var remaining = size
var startOffset = range.fromOffset
(0 until parts).map { part =>
// Fine to do integer division. Last partition will consume all the round off errors
val thisPartition = remaining / (parts - part)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thisPartition will be the same as remaining for the last part. This will ensure we always get a KafkaOffsetRange ending with range.untilOffset.

remaining -= thisPartition
val endOffset = math.min(startOffset + thisPartition, range.untilOffset)
val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, None)
startOffset = endOffset
offsetRange
}
}
}.filter(_.size > 0)
Copy link
Contributor

@HeartSaVioR HeartSaVioR Jul 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it could be possible, but suppose it could be possible (as we have this, and we are doing integer division), then we still have chance to have less than minPartitions even the calculation on ratio-based distribution is correct.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
KafkaOffsetRange(tp2, 14, 21, None)))
}

testWithMinPartitions("SPARK-28489: never drop offsets", 6) { calc =>
assert(
calc.getRanges(
fromOffsets = Map(tp1 -> 0, tp2 -> 0, tp3 -> 0),
untilOffsets = Map(tp1 -> 10, tp2 -> 10, tp3 -> 1)) ==
Seq(
KafkaOffsetRange(tp1, 0, 3, None),
KafkaOffsetRange(tp1, 3, 6, None),
KafkaOffsetRange(tp1, 6, 10, None),
KafkaOffsetRange(tp2, 0, 3, None),
KafkaOffsetRange(tp2, 3, 6, None),
KafkaOffsetRange(tp2, 6, 10, None),
KafkaOffsetRange(tp3, 0, 1, None)))
}

private val tp1 = new TopicPartition("t1", 1)
private val tp2 = new TopicPartition("t2", 1)
private val tp3 = new TopicPartition("t3", 1)
Expand Down