-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-28489][SS] Fix a bug that KafkaOffsetRangeCalculator.getRanges may drop offsets #25237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| val tp = range.topicPartition | ||
| val size = range.size | ||
| // number of partitions to divvy up this topic partition to | ||
| val parts = math.max(math.round(size.toDouble / totalSize * minPartitions.get), 1).toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one ensures we never drop a TopicPartition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ratio calculation looks good, but round seems to generate less partitions. Is there a reason to choose round instead of ceiling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm seeing the same. Suppose 4 offsetRanges divide 1 partition for each 0.25, then we lost 1. The number of lost partitions may vary.
In other words, if we use ceil, it may overflow the minimum partitions, and the number of exceeding partitions may vary. We don't guarantee for this calculator to return partitions closest to minimum partitions, so it seems OK.
If we really would like to make this strict, we could apply "allocation" - calculating ratio on each offsetRange, and allocate partitions to each offsetRange according to ratio (apply minimum of 1 for safeness), and allocate extra partitions to some offsetRanges if there're remaining partitions. Not sure we would like to deal with complexity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it's a hint. And when the number of partitions is less than minPartition, we will try our best to split. Agreed that the option name minPartition is not accurate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, could you update the document instead in a more accurate way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun I think the doc for this method is accurate :
Line 38 in c4010a2
| * The number of Spark tasks will be *approximately* `numPartitions`. It can be less or more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few days ago, minPartitions is added to the documentation for master/branch-2.4 via #25219 .
| var startOffset = range.fromOffset | ||
| (0 until parts).map { part => | ||
| // Fine to do integer division. Last partition will consume all the round off errors | ||
| val thisPartition = remaining / (parts - part) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thisPartition will be the same as remaining for the last part. This will ensure we always get a KafkaOffsetRange ending with range.untilOffset.
...-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #108067 has finished for PR 25237 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the fix, @zsxwing . Could you handle the corner case like the following together in this PR? Although this is not a regressionn(master branch is the same), but currently we have less number of partitions than the given minPartitions for some cases. For example, the following passed.
test("with minPartition = 4") {
val options = new CaseInsensitiveStringMap(Map("minPartitions" -> "4").asJava)
val calc = KafkaOffsetRangeCalculator(options)
assert(
calc.getRanges(
fromOffsets = Map(tp1 -> 0, tp2 -> 0, tp3 -> 0),
untilOffsets = Map(tp1 -> 29, tp2 -> 29, tp3 -> 29)) ==
Seq(
KafkaOffsetRange(tp1, 0, 29, None),
KafkaOffsetRange(tp2, 0, 29, None),
KafkaOffsetRange(tp3, 0, 29, None)))
}|
cc @tdas , @HeartSaVioR , @gaborgsomogyi Also, cc @gatorsmile since this is reported as a blocker issue for 2.4.4. |
| offsetRange | ||
| } | ||
| } | ||
| }.filter(_.size > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it could be possible, but suppose it could be possible (as we have this, and we are doing integer division), then we still have chance to have less than minPartitions even the calculation on ratio-based distribution is correct.
|
Hmm... I'm now reading comment on getRanges. I'm not sure
Lines 32 to 46 in d67b98e
Please ignore my review comments if the javadoc meant it. Looks great. |
|
Test build #108129 has finished for PR 25237 at commit
|
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you for the fix. For the documentation, let's update it later.
Merged to master/2.4.
… may drop offsets ## What changes were proposed in this pull request? `KafkaOffsetRangeCalculator.getRanges` may drop offsets due to round off errors. The test added in this PR is one example. This PR rewrites the logic in `KafkaOffsetRangeCalculator.getRanges` to ensure it never drops offsets. ## How was this patch tested? The regression test. Closes #25237 from zsxwing/fix-range. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit b9c2521) Signed-off-by: Dongjoon Hyun <[email protected]>
|
#25332 is a follow-up PR to address documentation. |
|
Thanks, @HeartSaVioR ! It's merged. |
… may drop offsets ## What changes were proposed in this pull request? `KafkaOffsetRangeCalculator.getRanges` may drop offsets due to round off errors. The test added in this PR is one example. This PR rewrites the logic in `KafkaOffsetRangeCalculator.getRanges` to ensure it never drops offsets. ## How was this patch tested? The regression test. Closes apache#25237 from zsxwing/fix-range. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit b9c2521) Signed-off-by: Dongjoon Hyun <[email protected]>
… may drop offsets ## What changes were proposed in this pull request? `KafkaOffsetRangeCalculator.getRanges` may drop offsets due to round off errors. The test added in this PR is one example. This PR rewrites the logic in `KafkaOffsetRangeCalculator.getRanges` to ensure it never drops offsets. ## How was this patch tested? The regression test. Closes apache#25237 from zsxwing/fix-range. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit b9c2521) Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
KafkaOffsetRangeCalculator.getRangesmay drop offsets due to round off errors. The test added in this PR is one example.This PR rewrites the logic in
KafkaOffsetRangeCalculator.getRangesto ensure it never drops offsets.How was this patch tested?
The regression test.