Skip to content

Commit b27ea82

Browse files
authored
Fixed "Single message comes late" (apache#135)
1 parent 93f0b54 commit b27ea82

2 files changed

Lines changed: 9 additions & 7 deletions

File tree

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/CachedKafkaConsumer.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class CachedKafkaConsumer[K, V] private(
4949
c
5050
}
5151

52+
val isStreams = topic.startsWith("/") && topic.contains(":")
53+
5254
// TODO if the buffer was kept around as a random-access structure,
5355
// could possibly optimize re-calculating of an RDD in the same batch
5456
protected var buffer = ju.Collections.emptyList[ConsumerRecord[K, V]]().iterator
@@ -68,13 +70,15 @@ class CachedKafkaConsumer[K, V] private(
6870
poll(timeout)
6971
}
7072

71-
if (!buffer.hasNext()) { poll(timeout) }
72-
assert(buffer.hasNext(),
73+
if (!buffer.hasNext) { poll(timeout) }
74+
75+
assert(buffer.hasNext,
7376
s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout")
7477
val record = buffer.next()
7578

7679
nextOffset = offset + 1
77-
record
80+
81+
if (record.offset() == 0 && isStreams && buffer.hasNext) buffer.next() else record
7882
// Offsets in MapR-streams can contains gaps
7983
/* if (record.offset < offset) {
8084
logInfo(s"Buffer miss for $groupId $topic $partition $offset")

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaRDD.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -227,15 +227,13 @@ private[spark] class KafkaRDD[K, V](
227227
// override def hasNext(): Boolean = requestOffset < part.untilOffset
228228
override def getNext(): ConsumerRecord[K, V] = {
229229

230-
val isStreams = consumer.topic.startsWith("/") && consumer.topic.contains(":")
231-
232230
@tailrec
233231
def skipGapsAndGetNext: ConsumerRecord[K, V] = {
234232
if (requestOffset < part.untilOffset) {
235233
val r = consumer.get(requestOffset, pollTimeout)
236234

237-
if (isStreams && r.offset() == 0) {
238-
requestOffset = requestOffset + 1
235+
if (consumer.isStreams && r.offset() == 0) {
236+
requestOffset = part.untilOffset
239237
skipGapsAndGetNext
240238
} else {
241239
requestOffset = r.offset() + 1

0 commit comments

Comments
 (0)