Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReaderFactory}


private[kafka010] class KafkaBatch(
strategy: ConsumerStrategy,
sourceOptions: Map[String, String],
sourceOptions: CaseInsensitiveMap[String],
specifiedKafkaParams: Map[String, String],
failOnDataLoss: Boolean,
startingOffsets: KafkaOffsetRangeLimit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* properly read.
*/
class KafkaContinuousStream(
offsetReader: KafkaOffsetReader,
private val offsetReader: KafkaOffsetReader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is private val required for Use CaseInsensitiveMap for KafkaOffsetReader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise this code:

getField(stream, offsetReaderMethod)

throws exception something like:

java.lang.IllegalArgumentException: Can't find a private method named: offsetReader

The other possibility to add a getter explicitly which I thought is overkill.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach would be Scala reflection but definitely more verbose than explicit getter.

kafkaParams: ju.Map[String, Object],
options: CaseInsensitiveStringMap,
metadataPath: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import org.apache.spark.util.UninterruptibleThread
* and not use wrong broker addresses.
*/
private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader: KafkaOffsetReader,
private val offsetReader: KafkaOffsetReader,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Is private val required for Use CaseInsensitiveMap for KafkaOffsetReader?
  • Is the renaming from kafkaOffsetReader to offsetReader required for Use CaseInsensitiveMap for KafkaOffsetReader?

Copy link
Contributor Author

@gaborgsomogyi gaborgsomogyi Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is private val required for Use CaseInsensitiveMap for KafkaOffsetReader?

Yeah, please see my previous comment. Same applies here.

Is the renaming from kafkaOffsetReader to offsetReader required for Use CaseInsensitiveMap for KafkaOffsetReader?

In short, no. A little bit detailed if not renamed then the test requires something like this:

  private val offsetReaderMicroBatchMethod = PrivateMethod[KafkaOffsetReader]('kafkaOffsetReader)
  private val offsetReaderContinuousMethod = PrivateMethod[KafkaOffsetReader]('offsetReader)
...
    verifyFieldsInMicroBatchStream(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, expected, stream => {
      val kafkaOffsetReader = getField(stream, offsetReaderMicroBatchMethod)
      assert(expected.toInt === getField(kafkaOffsetReader, fetchOffsetNumRetriesMethod))
    })
...

General considerations why I've renamed couple of variables:

  • Exact same parameters are named sometimes 2-4 different ways which is hard to track
  • Not renaming them would cause similar situation in the test code
  • Sometimes a SparkConf config name and it's internal scala variable has different name
  • Renamed only Spark internal variables which shouldn't matter for users

We can roll back renamings, the code will work properly but I think these changes increase readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rename rolled back.
private val I think is essential.

executorKafkaParams: ju.Map[String, Object],
options: CaseInsensitiveStringMap,
metadataPath: String,
Expand Down Expand Up @@ -85,7 +85,7 @@ private[kafka010] class KafkaMicroBatchStream(

override def latestOffset(start: Offset): Offset = {
val startPartitionOffsets = start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
val latestPartitionOffsets = offsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { maxOffsets =>
rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
}.getOrElse {
Expand All @@ -100,7 +100,7 @@ private[kafka010] class KafkaMicroBatchStream(

// Find the new partitions, and get their earliest offsets
val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
val newPartitionInitialOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
if (newPartitionInitialOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionInitialOffsets.keySet)
Expand All @@ -117,7 +117,7 @@ private[kafka010] class KafkaMicroBatchStream(
val deletedPartitions = startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
if (deletedPartitions.nonEmpty) {
val message =
if (kafkaOffsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
if (offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
} else {
s"$deletedPartitions are gone. Some data may have been missed."
Expand Down Expand Up @@ -172,10 +172,10 @@ private[kafka010] class KafkaMicroBatchStream(
override def commit(end: Offset): Unit = {}

override def stop(): Unit = {
kafkaOffsetReader.close()
offsetReader.close()
}

override def toString(): String = s"KafkaV2[$kafkaOffsetReader]"
override def toString(): String = s"KafkaV2[$offsetReader]"

/**
* Read initial partition offsets from the checkpoint, or decide the offsets and write them to
Expand All @@ -195,11 +195,11 @@ private[kafka010] class KafkaMicroBatchStream(
metadataLog.get(0).getOrElse {
val offsets = startingOffsets match {
case EarliestOffsetRangeLimit =>
KafkaSourceOffset(kafkaOffsetReader.fetchEarliestOffsets())
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
case LatestOffsetRangeLimit =>
KafkaSourceOffset(kafkaOffsetReader.fetchLatestOffsets(None))
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
case SpecificOffsetRangeLimit(p) =>
kafkaOffsetReader.fetchSpecificOffsets(p, reportDataLoss)
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
}
metadataLog.add(0, offsets)
logInfo(s"Initial offsets: $offsets")
Expand All @@ -212,7 +212,7 @@ private[kafka010] class KafkaMicroBatchStream(
limit: Long,
from: PartitionOffsetMap,
until: PartitionOffsetMap): PartitionOffsetMap = {
val fromNew = kafkaOffsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val fromNew = offsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val sizes = until.flatMap {
case (tp, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, KafkaConsume
import org.apache.kafka.common.TopicPartition

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.types._
import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}

Expand All @@ -47,7 +48,7 @@ import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
private[kafka010] class KafkaOffsetReader(
consumerStrategy: ConsumerStrategy,
val driverKafkaParams: ju.Map[String, Object],
readerOptions: Map[String, String],
readerOptions: CaseInsensitiveMap[String],
driverGroupIdPrefix: String) extends Logging {
/**
* Used to ensure execute fetch operations execute in an UninterruptibleThread
Expand Down Expand Up @@ -88,10 +89,10 @@ private[kafka010] class KafkaOffsetReader(
_consumer
}

private val maxOffsetFetchAttempts =
private val fetchOffsetNumRetries =
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_NUM_RETRY, "3").toInt

private val offsetFetchAttemptIntervalMs =
private val fetchOffsetRetryIntervalMs =
readerOptions.getOrElse(KafkaSourceProvider.FETCH_OFFSET_RETRY_INTERVAL_MS, "1000").toLong

private def nextGroupId(): String = {
Expand Down Expand Up @@ -293,12 +294,12 @@ private[kafka010] class KafkaOffsetReader(
if (incorrectOffsets.nonEmpty) {
logWarning("Found incorrect offsets in some partitions " +
s"(partition, previous offset, fetched offset): $incorrectOffsets")
if (attempt < maxOffsetFetchAttempts) {
if (attempt < fetchOffsetNumRetries) {
logWarning("Retrying to fetch latest offsets because of incorrect offsets")
Thread.sleep(offsetFetchAttemptIntervalMs)
Thread.sleep(fetchOffsetRetryIntervalMs)
}
}
} while (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts)
} while (incorrectOffsets.nonEmpty && attempt < fetchOffsetNumRetries)

logDebug(s"Got latest offsets for partition : $partitionOffsets")
partitionOffsets
Expand Down Expand Up @@ -371,7 +372,7 @@ private[kafka010] class KafkaOffsetReader(
var result: Option[Map[TopicPartition, Long]] = None
var attempt = 1
var lastException: Throwable = null
while (result.isEmpty && attempt <= maxOffsetFetchAttempts
while (result.isEmpty && attempt <= fetchOffsetNumRetries
&& !Thread.currentThread().isInterrupted) {
Thread.currentThread match {
case ut: UninterruptibleThread =>
Expand All @@ -389,7 +390,7 @@ private[kafka010] class KafkaOffsetReader(
lastException = e
logWarning(s"Error in attempt $attempt getting Kafka offsets: ", e)
attempt += 1
Thread.sleep(offsetFetchAttemptIntervalMs)
Thread.sleep(fetchOffsetRetryIntervalMs)
resetConsumer()
}
}
Expand All @@ -402,7 +403,7 @@ private[kafka010] class KafkaOffsetReader(
throw new InterruptedException()
}
if (result.isEmpty) {
assert(attempt > maxOffsetFetchAttempts)
assert(attempt > fetchOffsetNumRetries)
assert(lastException != null)
throw lastException
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.unsafe.types.UTF8String
private[kafka010] class KafkaRelation(
override val sqlContext: SQLContext,
strategy: ConsumerStrategy,
sourceOptions: Map[String, String],
sourceOptions: CaseInsensitiveMap[String],
specifiedKafkaParams: Map[String, String],
failOnDataLoss: Boolean,
startingOffsets: KafkaOffsetRangeLimit,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,32 +78,32 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): Source = {
validateStreamOptions(parameters)
val caseInsensitiveParameters = CaseInsensitiveMap(parameters)
validateStreamOptions(caseInsensitiveParameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveParameters, metadataPath)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams = convertToSpecifiedParams(parameters)

val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParameters, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
strategy(caseInsensitiveParameters),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
caseInsensitiveParameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")

new KafkaSource(
sqlContext,
kafkaOffsetReader,
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
parameters,
caseInsensitiveParameters,
metadataPath,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
failOnDataLoss(caseInsensitiveParameters))
}

override def getTable(options: CaseInsensitiveStringMap): KafkaTable = {
Expand All @@ -119,24 +119,24 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
validateBatchOptions(parameters)
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val caseInsensitiveParameters = CaseInsensitiveMap(parameters)
validateBatchOptions(caseInsensitiveParameters)
val specifiedKafkaParams = convertToSpecifiedParams(parameters)

val startingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit)
caseInsensitiveParameters, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit)
assert(startingRelationOffsets != LatestOffsetRangeLimit)

val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
val endingRelationOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParameters, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
assert(endingRelationOffsets != EarliestOffsetRangeLimit)

new KafkaRelation(
sqlContext,
strategy(caseInsensitiveParams),
sourceOptions = parameters,
strategy(caseInsensitiveParameters),
sourceOptions = caseInsensitiveParameters,
specifiedKafkaParams = specifiedKafkaParams,
failOnDataLoss = failOnDataLoss(caseInsensitiveParams),
failOnDataLoss = failOnDataLoss(caseInsensitiveParameters),
startingOffsets = startingRelationOffsets,
endingOffsets = endingRelationOffsets)
}
Expand Down Expand Up @@ -420,23 +420,22 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap)
validateStreamOptions(caseInsensitiveOptions)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation)
val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveOptions, checkpointLocation)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams = convertToSpecifiedParams(parameters)
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions)

val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
caseInsensitiveOptions, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(parameters),
strategy(caseInsensitiveOptions),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
caseInsensitiveOptions,
driverGroupIdPrefix = s"$uniqueGroupId-driver")

new KafkaMicroBatchStream(
Expand All @@ -445,32 +444,26 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
options,
checkpointLocation,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
failOnDataLoss(caseInsensitiveOptions))
}

override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap)
validateStreamOptions(caseInsensitiveOptions)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation)
val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveOptions, checkpointLocation)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams =
parameters
.keySet
.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a required change but thought it would be good to simplify.


val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
caseInsensitiveOptions, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
strategy(caseInsensitiveOptions),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
caseInsensitiveOptions,
driverGroupIdPrefix = s"$uniqueGroupId-driver")

new KafkaContinuousStream(
Expand All @@ -479,7 +472,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
options,
checkpointLocation,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
failOnDataLoss(caseInsensitiveOptions))
}
}
}
Expand Down
Loading