Skip to content

Commit a3ce6ef

Browse files
committed
MapR [SPARK-482] Spark streaming app fails to start by UnknownTopicOrPartitionException with checkpoint (apache#453)
1 parent ea35c03 commit a3ce6ef

4 files changed

Lines changed: 36 additions & 6 deletions

File tree

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ private case class Subscribe[K, V](
105105
val shouldSuppress =
106106
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
107107
try {
108-
consumer.poll(0)
108+
// consumer.poll(0)
109+
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
109110
} catch {
110111
case x: NoOffsetForPartitionException if shouldSuppress =>
111112
logWarning("Catching NoOffsetForPartitionException since " +
@@ -158,7 +159,8 @@ private case class SubscribePattern[K, V](
158159
val shouldSuppress =
159160
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
160161
try {
161-
consumer.poll(0)
162+
// consumer.poll(0)
163+
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
162164
} catch {
163165
case x: NoOffsetForPartitionException if shouldSuppress =>
164166
logWarning("Catching NoOffsetForPartitionException since " +

external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaUtils.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
2525
import org.apache.kafka.clients.consumer._
2626
import org.apache.kafka.common.TopicPartition
2727

28-
import org.apache.spark.SparkContext
28+
import org.apache.spark.{SparkContext, SparkEnv}
2929
import org.apache.spark.annotation.Experimental
3030
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
3131
import org.apache.spark.api.python.SerDeUtil
@@ -230,6 +230,19 @@ object KafkaUtils extends Logging {
230230
}
231231
}
232232

233+
def waitForConsumerAssignment[K, V](consumer: KafkaConsumer[K, V],
234+
partitions: ju.Set[TopicPartition]): Unit = {
235+
val waitingForAssigmentTimeout = SparkEnv.get.conf.
236+
getLong("spark.mapr.WaitingForAssignmentTimeout", 600000)
237+
238+
var timeout = 0
239+
while ((consumer.assignment().isEmpty || consumer.assignment().size() < partitions.size)
240+
&& timeout < waitingForAssigmentTimeout) {
241+
242+
Thread.sleep(500)
243+
timeout += 500
244+
}
245+
}
233246
}
234247

235248
object KafkaUtilsPythonHelper {

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/ConsumerStrategy.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ private case class Subscribe[K, V](
106106
val shouldSuppress =
107107
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
108108
try {
109-
consumer.poll(0)
109+
// consumer.poll(0)
110+
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
110111
} catch {
111112
case x: NoOffsetForPartitionException if shouldSuppress =>
112113
logWarning("Catching NoOffsetForPartitionException since " +
@@ -159,7 +160,8 @@ private case class SubscribePattern[K, V](
159160
val shouldSuppress =
160161
aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
161162
try {
162-
consumer.poll(0)
163+
// consumer.poll(0)
164+
KafkaUtils.waitForConsumerAssignment(consumer, toSeek.keySet())
163165
} catch {
164166
case x: NoOffsetForPartitionException if shouldSuppress =>
165167
logWarning("Catching NoOffsetForPartitionException since " +

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaUtils.scala

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
2525
import org.apache.kafka.clients.consumer._
2626
import org.apache.kafka.common.TopicPartition
2727

28-
import org.apache.spark.SparkContext
28+
import org.apache.spark.{SparkContext, SparkEnv}
2929
import org.apache.spark.annotation.Experimental
3030
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
3131
import org.apache.spark.api.python.SerDeUtil
@@ -231,6 +231,19 @@ object KafkaUtils extends Logging {
231231
}
232232
}
233233

234+
def waitForConsumerAssignment[K, V](consumer: KafkaConsumer[K, V],
235+
partitions: ju.Set[TopicPartition]): Unit = {
236+
val waitingForAssigmentTimeout = SparkEnv.get.conf.
237+
getLong("spark.mapr.WaitingForAssignmentTimeout", 600000)
238+
239+
var timeout = 0
240+
while ((consumer.assignment().isEmpty || consumer.assignment().size() < partitions.size)
241+
&& timeout < waitingForAssigmentTimeout) {
242+
243+
Thread.sleep(500)
244+
timeout += 500
245+
}
246+
}
234247
}
235248

236249
@deprecated("Use kafka10 package instead of kafka09", "MapR Spark-2.3.2")

0 commit comments

Comments
 (0)