Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
Expand Down Expand Up @@ -393,23 +394,22 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap)
validateStreamOptions(caseInsensitiveOptions)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation)
val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveOptions, checkpointLocation)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams = convertToSpecifiedParams(parameters)
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions)

val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
caseInsensitiveOptions, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(parameters),
strategy(caseInsensitiveOptions),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
caseInsensitiveOptions,
driverGroupIdPrefix = s"$uniqueGroupId-driver")

new KafkaMicroBatchStream(
Expand All @@ -418,32 +418,26 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
options,
checkpointLocation,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
failOnDataLoss(caseInsensitiveOptions))
}

override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
val caseInsensitiveOptions = CaseInsensitiveMap(options.asScala.toMap)
validateStreamOptions(caseInsensitiveOptions)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation)
val uniqueGroupId = streamingUniqueGroupId(caseInsensitiveOptions, checkpointLocation)

val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams =
parameters
.keySet
.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap
val specifiedKafkaParams = convertToSpecifiedParams(caseInsensitiveOptions)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a required change but thought it would be good to simplify.


val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
caseInsensitiveOptions, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)

val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
strategy(caseInsensitiveOptions),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
caseInsensitiveOptions,
driverGroupIdPrefix = s"$uniqueGroupId-driver")

new KafkaContinuousStream(
Expand All @@ -452,7 +446,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
options,
checkpointLocation,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
failOnDataLoss(caseInsensitiveOptions))
}
}
}
Expand Down