From c20bd14a4bed34644efc11de420a1caeccea329e Mon Sep 17 00:00:00 2001 From: Yuval Itzchakov Date: Sat, 26 Aug 2017 18:21:17 +0300 Subject: [PATCH 1/3] Avoid using "return" inside `CachedKafkaConsumer.get` as it is passed to `org.apache.spark.util.UninterruptibleThread.runUninterruptibly` as a function type which causes a NonLocalReturnControl to be called for every call --- .../sql/kafka010/CachedKafkaConsumer.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 7c4f38e02fb2a..38c8346e4b356 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -112,9 +112,15 @@ private[kafka010] case class CachedKafkaConsumer private( // we will move to the next available offset within `[offset, untilOffset)` and retry. // If `failOnDataLoss` is `true`, the loop body will be executed only once. var toFetchOffset = offset - while (toFetchOffset != UNKNOWN_OFFSET) { + var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null + // We want to break out of the while loop on a successful fetch to avoid using "return" + // which may causes a NonLocalReturnControl exception when this method is used as a function. + var isFetchComplete = false + + while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) { try { - return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) + consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) + isFetchComplete = true } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -125,8 +131,11 @@ private[kafka010] case class CachedKafkaConsumer private( toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset) } } - resetFetchedData() - null + + if (isFetchComplete) consumerRecord else { + resetFetchedData() + null + } } /** From 18b9301553427a7b6c038e144f1be52949d82eb9 Mon Sep 17 00:00:00 2001 From: Yuval Itzchakov Date: Sun, 27 Aug 2017 10:58:01 +0300 Subject: [PATCH 2/3] Comments after code review --- .../org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 38c8346e4b356..90ed7b1fba2f8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -132,7 +132,9 @@ private[kafka010] case class CachedKafkaConsumer private( } } - if (isFetchComplete) consumerRecord else { + if (isFetchComplete) { + consumerRecord + } else { resetFetchedData() null } From 2b43146ff0155301cad403605f15171a8c6a9149 Mon Sep 17 00:00:00 2001 From: Yuval Itzchakov Date: Sat, 4 Aug 2018 10:24:24 +0300 Subject: [PATCH 3/3] Fixes SPARK-24987. Kafka consumer wasn't released when `fromOffset` was equal to `toOffset`. --- .../scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala index 53bd9a96d1d68..8b4494d2e9a25 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala @@ -124,8 +124,6 @@ private[kafka010] class KafkaSourceRDD( thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[Array[Byte], Array[Byte]]] = { val sourcePartition = thePart.asInstanceOf[KafkaSourceRDDPartition] - val topic = sourcePartition.offsetRange.topic - val kafkaPartition = sourcePartition.offsetRange.partition val consumer = KafkaDataConsumer.acquire( sourcePartition.offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer) @@ -138,6 +136,7 @@ private[kafka010] class KafkaSourceRDD( if (range.fromOffset == range.untilOffset) { logInfo(s"Beginning offset ${range.fromOffset} is the same as ending offset " + s"skipping ${range.topic} ${range.partition}") + consumer.release() Iterator.empty } else { val underlying = new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {