Skip to content

Commit 731a997

Browse files
tdasAndrew Or
authored andcommitted
[SPARK-6027][SPARK-5546] Fixed --jar and --packages not working for KafkaUtils and improved error message
The problem with SPARK-6027 in short is that JARs like the kafka-assembly.jar does not work in python as the added JAR is not visible in the classloader used by Py4J. Py4J uses Class.forName(), which does not uses the systemclassloader, but the JARs are only visible in the Thread's contextclassloader. So this back uses the context class loader to create the KafkaUtils dstream object. This works for both cases where the Kafka libraries are added with --jars spark-streaming-kafka-assembly.jar or with --packages spark-streaming-kafka Also improves the error message. davies Author: Tathagata Das <[email protected]> Closes #4779 from tdas/kafka-python-fix and squashes the following commits: fb16b04 [Tathagata Das] Removed import c1fdf35 [Tathagata Das] Fixed long line and improved documentation 7b88be8 [Tathagata Das] Fixed --jar not working for KafkaUtils and improved error message (cherry picked from commit aa63f63) Signed-off-by: Andrew Or <[email protected]>
1 parent 62652dc commit 731a997

2 files changed

Lines changed: 55 additions & 16 deletions

File tree

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
2727

2828
import kafka.common.TopicAndPartition
2929
import kafka.message.MessageAndMetadata
30-
import kafka.serializer.{Decoder, StringDecoder}
30+
import kafka.serializer.{DefaultDecoder, Decoder, StringDecoder}
3131

3232
import org.apache.spark.api.java.function.{Function => JFunction}
3333
import org.apache.spark.{SparkContext, SparkException}
@@ -532,3 +532,30 @@ object KafkaUtils {
532532
)
533533
}
534534
}
535+
536+
/**
537+
* This is a helper class that wraps the KafkaUtils.createStream() into more
538+
* Python-friendly class and function so that it can be easily
539+
* instantiated and called from Python's KafkaUtils (see SPARK-6027).
540+
*
541+
* The zero-arg constructor helps instantiate this class from the Class object
542+
* classOf[KafkaUtilsPythonHelper].newInstance(), and the createStream()
543+
* takes care of known parameters instead of passing them from Python
544+
*/
545+
private class KafkaUtilsPythonHelper {
546+
def createStream(
547+
jssc: JavaStreamingContext,
548+
kafkaParams: JMap[String, String],
549+
topics: JMap[String, JInt],
550+
storageLevel: StorageLevel): JavaPairReceiverInputDStream[Array[Byte], Array[Byte]] = {
551+
KafkaUtils.createStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](
552+
jssc,
553+
classOf[Array[Byte]],
554+
classOf[Array[Byte]],
555+
classOf[DefaultDecoder],
556+
classOf[DefaultDecoder],
557+
kafkaParams,
558+
topics,
559+
storageLevel)
560+
}
561+
}

python/pyspark/streaming/kafka.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#
1717

1818
from py4j.java_collections import MapConverter
19-
from py4j.java_gateway import java_import, Py4JError
19+
from py4j.java_gateway import java_import, Py4JError, Py4JJavaError
2020

2121
from pyspark.storagelevel import StorageLevel
2222
from pyspark.serializers import PairDeserializer, NoOpSerializer
@@ -50,8 +50,6 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
5050
:param valueDecoder: A function used to decode value (default is utf8_decoder)
5151
:return: A DStream object
5252
"""
53-
java_import(ssc._jvm, "org.apache.spark.streaming.kafka.KafkaUtils")
54-
5553
kafkaParams.update({
5654
"zookeeper.connect": zkQuorum,
5755
"group.id": groupId,
@@ -63,20 +61,34 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams={},
6361
jparam = MapConverter().convert(kafkaParams, ssc.sparkContext._gateway._gateway_client)
6462
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
6563

66-
def getClassByName(name):
67-
return ssc._jvm.org.apache.spark.util.Utils.classForName(name)
68-
6964
try:
70-
array = getClassByName("[B")
71-
decoder = getClassByName("kafka.serializer.DefaultDecoder")
72-
jstream = ssc._jvm.KafkaUtils.createStream(ssc._jssc, array, array, decoder, decoder,
73-
jparam, jtopics, jlevel)
74-
except Py4JError, e:
65+
# Use KafkaUtilsPythonHelper to access Scala's KafkaUtils (see SPARK-6027)
66+
helperClass = ssc._jvm.java.lang.Thread.currentThread().getContextClassLoader()\
67+
.loadClass("org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper")
68+
helper = helperClass.newInstance()
69+
jstream = helper.createStream(ssc._jssc, jparam, jtopics, jlevel)
70+
except Py4JJavaError, e:
7571
# TODO: use --jar once it also work on driver
76-
if not e.message or 'call a package' in e.message:
77-
print "No kafka package, please put the assembly jar into classpath:"
78-
print " $ bin/spark-submit --driver-class-path external/kafka-assembly/target/" + \
79-
"scala-*/spark-streaming-kafka-assembly-*.jar"
72+
if 'ClassNotFoundException' in str(e.java_exception):
73+
print """
74+
________________________________________________________________________________________________
75+
76+
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
77+
78+
1. Include the Kafka library and its dependencies with in the
79+
spark-submit command as
80+
81+
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka:%s ...
82+
83+
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
84+
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-assembly, Version = %s.
85+
Then, innclude the jar in the spark-submit command as
86+
87+
$ bin/spark-submit --jars <spark-streaming-kafka-assembly.jar> ...
88+
89+
________________________________________________________________________________________________
90+
91+
""" % (ssc.sparkContext.version, ssc.sparkContext.version)
8092
raise e
8193
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
8294
stream = DStream(jstream, ssc, ser)

0 commit comments

Comments
 (0)