Skip to content

Commit 5cd7b8e

Browse files
committed
Rollback KafkaMicroBatchStream reanmes
1 parent dcc9d56 commit 5cd7b8e

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import org.apache.spark.util.UninterruptibleThread
5656
* and not use wrong broker addresses.
5757
*/
5858
private[kafka010] class KafkaMicroBatchStream(
59-
private val offsetReader: KafkaOffsetReader,
59+
private val kafkaOffsetReader: KafkaOffsetReader,
6060
executorKafkaParams: ju.Map[String, Object],
6161
options: CaseInsensitiveStringMap,
6262
metadataPath: String,
@@ -85,7 +85,7 @@ private[kafka010] class KafkaMicroBatchStream(
8585

8686
override def latestOffset(start: Offset): Offset = {
8787
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
88-
val latestPartitionOffsets = offsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
88+
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
8989
endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
9090
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
9191
}.getOrElse {
@@ -100,7 +100,7 @@ private[kafka010] class KafkaMicroBatchStream(
100100

101101
// Find the new partitions, and get their earliest offsets
102102
val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
103-
val newPartitionInitialOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
103+
val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
104104
if (newPartitionInitialOffsets.keySet != newPartitions) {
105105
// We cannot get from offsets for some partitions. It means they got deleted.
106106
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
@@ -117,7 +117,7 @@ private[kafka010] class KafkaMicroBatchStream(
117117
val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
118118
if (deletedPartitions.nonEmpty) {
119119
val message =
120-
if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
120+
if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
121121
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
122122
} else {
123123
s"$deletedPartitions are gone. Some data may have been missed."
@@ -172,10 +172,10 @@ private[kafka010] class KafkaMicroBatchStream(
172172
override def commit(end: Offset): Unit = {}
173173

174174
override def stop(): Unit = {
175-
offsetReader.close()
175+
kafkaOffsetReader.close()
176176
}
177177

178-
override def toString(): String = s"KafkaV2[$offsetReader]"
178+
override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
179179

180180
/**
181181
* Read initial partition offsets from the checkpoint, or decide the offsets and write them to
@@ -195,11 +195,11 @@ private[kafka010] class KafkaMicroBatchStream(
195195
metadataLog.get(0).getOrElse {
196196
val offsets = startingOffsets match {
197197
case EarliestOffsetRangeLimit =>
198-
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
198+
KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
199199
case LatestOffsetRangeLimit =>
200-
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
200+
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
201201
case SpecificOffsetRangeLimit(p) =>
202-
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
202+
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
203203
}
204204
metadataLog.add(0, offsets)
205205
logInfo(s"Initial offsets: $offsets")
@@ -212,7 +212,7 @@ private[kafka010] class KafkaMicroBatchStream(
212212
limit: Long,
213213
from: PartitionOffsetMap,
214214
until: PartitionOffsetMap): PartitionOffsetMap = {
215-
val fromNew = offsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
215+
val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
216216
val sizes = until.flatMap {
217217
case (tp, end) =>
218218
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceProviderSuite.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester {
3333
private val expected = "1111"
3434
private val pollTimeoutMsMethod = PrivateMethod[Long]('pollTimeoutMs)
3535
private val maxOffsetsPerTriggerMethod = PrivateMethod[Option[Long]]('maxOffsetsPerTrigger)
36-
private val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('offsetReader)
3736
private val maxOffsetFetchAttemptsMethod = PrivateMethod[Int]('maxOffsetFetchAttempts)
3837
private val offsetFetchAttemptIntervalMsMethod =
3938
PrivateMethod[Long]('offsetFetchAttemptIntervalMs)
@@ -50,6 +49,8 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester {
5049
}
5150

5251
test("micro-batch mode - options should be handled as case-insensitive") {
52+
val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('kafkaOffsetReader)
53+
5354
verifyFieldsInMicroBatchStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => {
5455
assert(expected.toLong === getField(stream, pollTimeoutMsMethod))
5556
})
@@ -68,6 +69,8 @@ class KafkaSourceProviderSuite extends SparkFunSuite with PrivateMethodTester {
6869
}
6970

7071
test("continuous mode - options should be handled as case-insensitive") {
72+
val offsetReaderMethod = PrivateMethod[KafkaOffsetReader]('offsetReader)
73+
7174
verifyFieldsInContinuousStream(KafkaSourceProvider.CONSUMER_POLL_TIMEOUT, expected, stream => {
7275
assert(expected.toLong === getField(stream, pollTimeoutMsMethod))
7376
})

0 commit comments

Comments
 (0)