Skip to content

Commit 6a7ff24

Browse files
committed
[SPARK-17813][SQL][KAFKA] set current offsets on recovery
1 parent 64fca67 commit 6a7ff24

2 files changed

Lines changed: 42 additions & 3 deletions

File tree

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ private[kafka010] case class KafkaSource(
137137
val offsets = maxOffsetsPerTrigger match {
138138
case None =>
139139
latest
140-
case Some(limit) if !currentPartitionOffsets.isDefined =>
140+
case Some(limit) if currentPartitionOffsets.isEmpty =>
141141
rateLimit(limit, initialPartitionOffsets, latest)
142142
case Some(limit) =>
143143
rateLimit(limit, currentPartitionOffsets.get, latest)
@@ -265,6 +265,12 @@ private[kafka010] case class KafkaSource(
265265

266266
logInfo("GetBatch generating RDD of offset range: " +
267267
offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
268+
269+
// On recovery, getBatch will get called before getOffset
270+
if (currentPartitionOffsets.isEmpty) {
271+
currentPartitionOffsets = Some(untilPartitionOffsets)
272+
}
273+
268274
sqlContext.createDataFrame(rdd, schema)
269275
}
270276

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import scala.util.Random
2323

2424
import org.apache.kafka.clients.producer.RecordMetadata
2525
import org.apache.kafka.common.TopicPartition
26+
import org.scalatest.concurrent.Eventually._
27+
import org.scalatest.concurrent.PatienceConfiguration.Timeout
2628
import org.scalatest.time.SpanSugar._
2729

2830
import org.apache.spark.sql.execution.streaming._
@@ -153,17 +155,48 @@ class KafkaSourceSuite extends KafkaSourceTest {
153155
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt)
154156

155157
val clock = new StreamManualClock
158+
159+
val waitUntilBatchProcessed = AssertOnQuery { q =>
160+
eventually(Timeout(streamingTimeout)) {
161+
if (!q.exception.isDefined) {
162+
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
163+
}
164+
}
165+
if (q.exception.isDefined) {
166+
throw q.exception.get
167+
}
168+
true
169+
}
170+
156171
testStream(mapped)(
157172
StartStream(ProcessingTime(100), clock),
158-
AdvanceManualClock(100),
173+
waitUntilBatchProcessed,
159174
// 1 from smallest, 1 from middle, 8 from biggest
160175
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
161176
AdvanceManualClock(100),
177+
waitUntilBatchProcessed,
162178
// smallest now empty, 1 more from middle, 9 more from biggest
163179
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
164180
11, 108, 109, 110, 111, 112, 113, 114, 115, 116
165181
),
166-
StopStream
182+
StopStream,
183+
StartStream(ProcessingTime(100), clock),
184+
waitUntilBatchProcessed,
185+
AdvanceManualClock(100),
186+
waitUntilBatchProcessed,
187+
// smallest now empty, 1 more from middle, 9 more from biggest
188+
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
189+
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
190+
12, 117, 118, 119, 120, 121, 122, 123, 124, 125
191+
),
192+
AdvanceManualClock(100),
193+
waitUntilBatchProcessed,
194+
// smallest now empty, 1 more from middle, 9 more from biggest
195+
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
196+
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
197+
12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
198+
13, 126, 127, 128, 129, 130, 131, 132, 133, 134
199+
)
167200
)
168201
}
169202

0 commit comments

Comments
 (0)