@@ -170,18 +170,18 @@ private[kafka010] case class KafkaSource(
170170
171171 // Use the until partitions to calculate offset ranges to ignore partitions that have
172172 // been deleted
173- val sortedTopicPartitions = untilPartitionOffsets.keySet.filter { tp =>
173+ val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
174174 // Ignore partitions that we don't know the from offsets.
175175 newPartitionOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
176- }.toSeq.sorted(topicPartitionOrdering)
177- logDebug(" Sorted topicPartitions : " + sortedTopicPartitions .mkString(" , " ))
176+ }.toSeq
177+ logDebug(" TopicPartitions : " + topicPartitions .mkString(" , " ))
178178
179179 val sortedExecutors = getSortedExecutorList(sc)
180180 val numExecutors = sortedExecutors.length
181181 logDebug(" Sorted executors: " + sortedExecutors.mkString(" , " ))
182182
183183 // Calculate offset ranges
184- val offsetRanges = sortedTopicPartitions .map { tp =>
184+ val offsetRanges = topicPartitions .map { tp =>
185185 val fromOffset = fromPartitionOffsets.get(tp).getOrElse {
186186 newPartitionOffsets.getOrElse(tp, {
187187 // This should not happen since newPartitionOffsets contains all partitions not in
@@ -191,6 +191,8 @@ private[kafka010] case class KafkaSource(
191191 }
192192 val untilOffset = untilPartitionOffsets(tp)
193193 val preferredLoc = if (numExecutors > 0 ) {
194+ // This allows cached KafkaConsumers in the executors to be re-used to read the same
195+ // partition in every batch.
194196 Some (sortedExecutors(floorMod(tp.hashCode, numExecutors)))
195197 } else None
196198 KafkaSourceRDDOffsetRange (tp, fromOffset, untilOffset, preferredLoc)
@@ -390,16 +392,5 @@ private[kafka010] object KafkaSource {
390392 if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host }
391393 }
392394
393- // Sort the partitions and current list of executors to consistently assign each partition
394- // to the executor. This allows cached KafkaConsumers in the executors to be re-used to
395- // read the same partition in every batch.
396- private val topicPartitionOrdering = new Ordering [TopicPartition ] {
397- override def compare (l : TopicPartition , r : TopicPartition ): Int = {
398- implicitly[Ordering [(String , Long )]].compare(
399- (l.topic, l.partition),
400- (r.topic, r.partition))
401- }
402- }
403-
404395 private def floorMod (a : Long , b : Int ): Int = ((a % b).toInt + b) % b
405396}
0 commit comments