Skip to content

Commit ff0be9e

Browse files
Mikhail Gorbovekrivokonmapr
authored andcommitted
Spark 2.0.1 MAPR-streams Python API (apache#73)
1 parent a62a161 commit ff0be9e

8 files changed

Lines changed: 634 additions & 63 deletions

File tree

examples/src/main/python/streaming/direct_kafka_wordcount.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,41 @@
3131
from __future__ import print_function
3232

3333
import sys
34-
3534
from pyspark import SparkContext
3635
from pyspark.streaming import StreamingContext
37-
from pyspark.streaming.kafka import KafkaUtils
36+
from pyspark.streaming.kafka09 import KafkaUtils
37+
from pyspark.streaming.kafka09 import ConsumerStrategies
38+
from pyspark.streaming.kafka09 import LocationStrategies
3839

3940
if __name__ == "__main__":
40-
if len(sys.argv) != 3:
41-
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
42-
sys.exit(-1)
41+
42+
if len(sys.argv) < 4:
43+
print("Usage: direct_kafka_wordcount.py <broker_list> <topic> <group_id> " +
44+
"<offset_reset> <batch_interval> <poll_timeout>", file=sys.stderr)
45+
exit(-1)
46+
47+
brokers, topic, group_id, offset_reset, batch_interval, poll_timeout = sys.argv[1:]
4348

4449
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
45-
ssc = StreamingContext(sc, 2)
50+
ssc = StreamingContext(sc, int(batch_interval))
51+
52+
kafkaParams = {
53+
"bootstrap.servers": brokers,
54+
"group.id": group_id,
55+
"key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
56+
"value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
57+
"auto.offset.reset": offset_reset,
58+
"enable.auto.commit": "false",
59+
"spark.kafka.poll.time": poll_timeout
60+
}
4661

47-
brokers, topic = sys.argv[1:]
48-
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
62+
consumerStrategy = ConsumerStrategies.Subscribe(sc, [topic], kafkaParams)
63+
locationStrategy = LocationStrategies.PreferConsistent(sc)
64+
kvs = KafkaUtils.createDirectStream(ssc, locationStrategy, consumerStrategy)
4965
lines = kvs.map(lambda x: x[1])
5066
counts = lines.flatMap(lambda line: line.split(" ")) \
5167
.map(lambda word: (word, 1)) \
52-
.reduceByKey(lambda a, b: a+b)
68+
.reduceByKey(lambda a, b: a + b)
5369
counts.pprint()
5470

5571
ssc.start()

examples/src/main/scala/org/apache/spark/examples/streaming/V10DirectKafkaWordCount.scala renamed to examples/src/main/scala/org/apache/spark/examples/streaming/V09DirectKafkaWordCount.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.spark.streaming.kafka09.{ConsumerStrategies, KafkaUtils, Locat
4242
* topic1,topic2 my-consumer-group latest batch-interval pollTimeout
4343
*/
4444

45-
object V10DirectKafkaWordCount {
45+
object V09DirectKafkaWordCount {
4646
def main(args: Array[String]) {
4747
if (args.length < 4) {
4848
System.err.println(s"""
@@ -90,7 +90,7 @@ object V10DirectKafkaWordCount {
9090

9191
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
9292
val messages = KafkaUtils.createDirectStream[String, String](
93-
ssc, LocationStrategies.PreferConsistent, consumerStrategy)
93+
ssc, LocationStrategies.PreferConsistent(), consumerStrategy)
9494

9595
// Get the lines, split them into words, count the words and print
9696
val lines = messages.map(_.value())

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/DirectKafkaInputDStream.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
4444
* per second that each '''partition''' will accept.
4545
* @param locationStrategy In most cases, pass in [[PreferConsistent]],
4646
* see [[LocationStrategy]] for more details.
47-
* @param executorKafkaParams Kafka
4847
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
4948
* configuration parameters</a>.
5049
* Requires "bootstrap.servers" to be set with Kafka broker(s),
@@ -110,7 +109,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
110109
}
111110

112111
// Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
113-
private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]"
112+
private[streaming] override def name: String = s"Kafka 0.09 direct stream [$id]"
114113

115114
protected[streaming] override val checkpointData =
116115
new DirectKafkaInputDStreamCheckpointData

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/KafkaUtils.scala

Lines changed: 124 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,25 @@
1717

1818
package org.apache.spark.streaming.kafka09
1919

20-
import java.{ util => ju }
20+
import java.{util => ju}
21+
import java.io.OutputStream
22+
import java.lang.{Integer => JInt, Long => JLong}
2123

24+
import com.google.common.base.Charsets.UTF_8
25+
import net.razorvine.pickle.{IObjectPickler, Opcodes, Pickler}
2226
import org.apache.kafka.clients.consumer._
2327
import org.apache.kafka.common.TopicPartition
2428

2529
import org.apache.spark.SparkContext
2630
import org.apache.spark.annotation.Experimental
27-
import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
28-
import org.apache.spark.api.java.function.{ Function0 => JFunction0 }
31+
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
32+
import org.apache.spark.api.java.function.{Function0 => JFunction0}
33+
import org.apache.spark.api.python.SerDeUtil
2934
import org.apache.spark.internal.Logging
3035
import org.apache.spark.rdd.RDD
3136
import org.apache.spark.streaming.StreamingContext
32-
import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext }
33-
import org.apache.spark.streaming.dstream._
37+
import org.apache.spark.streaming.api.java.{JavaDStream, JavaInputDStream, JavaStreamingContext}
38+
import org.apache.spark.streaming.dstream.InputDStream
3439

3540
/**
3641
* :: Experimental ::
@@ -76,22 +81,21 @@ object KafkaUtils extends Logging {
7681
}
7782

7883
/**
79-
* :: Experimental ::
80-
* Java constructor for a batch-oriented interface for consuming from Kafka.
81-
* Starting and ending offsets are specified in advance,
82-
* so that you can control exactly-once semantics.
83-
* @param keyClass Class of the keys in the Kafka records
84-
* @param valueClass Class of the values in the Kafka records
85-
* @param kafkaParams Kafka
86-
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
87-
* configuration parameters</a>. Requires "bootstrap.servers" to be set
88-
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
89-
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
90-
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
91-
* see [[LocationStrategies]] for more details.
92-
* @tparam K type of Kafka message key
93-
* @tparam V type of Kafka message value
94-
*/
84+
* :: Experimental ::
85+
* Java constructor for a batch-oriented interface for consuming from Kafka.
86+
* Starting and ending offsets are specified in advance,
87+
* so that you can control exactly-once semantics.
88+
*
89+
* @param kafkaParams Kafka
90+
* <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
91+
* configuration parameters</a>. Requires "bootstrap.servers" to be set
92+
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
93+
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
94+
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
95+
* see [[LocationStrategies]] for more details.
96+
* @tparam K type of Kafka message key
97+
* @tparam V type of Kafka message value
98+
*/
9599
@Experimental
96100
def createRDD[K, V](
97101
jsc: JavaSparkContext,
@@ -104,19 +108,20 @@ object KafkaUtils extends Logging {
104108
}
105109

106110
/**
107-
* :: Experimental ::
108-
* Scala constructor for a DStream where
109-
* each given Kafka topic/partition corresponds to an RDD partition.
110-
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
111-
* of messages
112-
* per second that each '''partition''' will accept.
113-
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
114-
* see [[LocationStrategies]] for more details.
115-
* @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
116-
* see [[ConsumerStrategies]] for more details
117-
* @tparam K type of Kafka message key
118-
* @tparam V type of Kafka message value
119-
*/
111+
* :: Experimental ::
112+
* Scala constructor for a DStream where
113+
* each given Kafka topic/partition corresponds to an RDD partition.
114+
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
115+
* of messages
116+
* per second that each '''partition''' will accept.
117+
*
118+
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
119+
* see [[LocationStrategies]] for more details.
120+
* @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
121+
* see [[ConsumerStrategies]] for more details
122+
* @tparam K type of Kafka message key
123+
* @tparam V type of Kafka message value
124+
*/
120125
@Experimental
121126
def createDirectStream[K, V](
122127
ssc: StreamingContext,
@@ -127,18 +132,17 @@ object KafkaUtils extends Logging {
127132
}
128133

129134
/**
130-
* :: Experimental ::
131-
* Java constructor for a DStream where
132-
* each given Kafka topic/partition corresponds to an RDD partition.
133-
* @param keyClass Class of the keys in the Kafka records
134-
* @param valueClass Class of the values in the Kafka records
135-
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
136-
* see [[LocationStrategies]] for more details.
137-
* @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
138-
* see [[ConsumerStrategies]] for more details
139-
* @tparam K type of Kafka message key
140-
* @tparam V type of Kafka message value
141-
*/
135+
* :: Experimental ::
136+
* Java constructor for a DStream where
137+
* each given Kafka topic/partition corresponds to an RDD partition.
138+
*
139+
* @param locationStrategy In most cases, pass in LocationStrategies.preferConsistent,
140+
* see [[LocationStrategies]] for more details.
141+
* @param consumerStrategy In most cases, pass in ConsumerStrategies.subscribe,
142+
* see [[ConsumerStrategies]] for more details
143+
* @tparam K type of Kafka message key
144+
* @tparam V type of Kafka message value
145+
*/
142146
@Experimental
143147
def createDirectStream[K, V](
144148
jssc: JavaStreamingContext,
@@ -151,8 +155,8 @@ object KafkaUtils extends Logging {
151155
}
152156

153157
/**
154-
* Tweak kafka params to prevent issues on executors
155-
*/
158+
* Tweak kafka params to prevent issues on executors
159+
*/
156160
private[kafka09] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
157161
logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
158162
kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
@@ -177,3 +181,75 @@ object KafkaUtils extends Logging {
177181
}
178182
}
179183
}
184+
185+
object KafkaUtilsPythonHelper {
186+
private var initialized = false
187+
188+
def initialize(): Unit = {
189+
SerDeUtil.initialize()
190+
synchronized {
191+
if (!initialized) {
192+
new PythonMessageAndMetadataPickler().register()
193+
initialized = true
194+
}
195+
}
196+
}
197+
198+
initialize()
199+
200+
def picklerIterator(iter: Iterator[ConsumerRecord[Array[Byte], Array[Byte]]]
201+
): Iterator[Array[Byte]] = {
202+
new SerDeUtil.AutoBatchedPickler(iter)
203+
}
204+
205+
class PythonMessageAndMetadataPickler extends IObjectPickler {
206+
private val module = "pyspark.streaming.kafka"
207+
208+
def register(): Unit = {
209+
Pickler.registerCustomPickler(classOf[ConsumerRecord[Any, Any]], this)
210+
Pickler.registerCustomPickler(this.getClass, this)
211+
}
212+
213+
def pickle(obj: Object, out: OutputStream, pickler: Pickler) {
214+
if (obj == this) {
215+
out.write(Opcodes.GLOBAL)
216+
out.write(s"$module\nKafkaMessageAndMetadata\n".getBytes(UTF_8))
217+
} else {
218+
pickler.save(this)
219+
val msgAndMetaData = obj.asInstanceOf[ConsumerRecord[Array[Byte], Array[Byte]]]
220+
out.write(Opcodes.MARK)
221+
pickler.save(msgAndMetaData.topic)
222+
pickler.save(msgAndMetaData.partition)
223+
pickler.save(msgAndMetaData.offset)
224+
pickler.save(msgAndMetaData.key)
225+
pickler.save(msgAndMetaData.value)
226+
out.write(Opcodes.TUPLE)
227+
out.write(Opcodes.REDUCE)
228+
}
229+
}
230+
}
231+
232+
// def createRDDWithoutMessageHandler(
233+
// jsc: JavaSparkContext,
234+
// kafkaParams: JMap[String, String],
235+
// offsetRanges: JList[OffsetRange],
236+
// leaders: JMap[TopicAndPartition, Broker]): JavaRDD[(Array[Byte], Array[Byte])] = {
237+
// val messageHandler =
238+
// (mmd: MessageAndMetadata[Array[Byte], Array[Byte]]) => (mmd.key, mmd.message)
239+
// new JavaRDD(createRDD(jsc, kafkaParams, offsetRanges, leaders, messageHandler))
240+
// }
241+
242+
@Experimental
243+
def createDirectStream(
244+
jssc: JavaStreamingContext,
245+
locationStrategy: LocationStrategy,
246+
consumerStrategy: ConsumerStrategy[Array[Byte], Array[Byte]]
247+
): JavaDStream[(Array[Byte], Array[Byte])] = {
248+
249+
val dStream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte]](
250+
jssc.ssc, locationStrategy, consumerStrategy)
251+
.map(cm => (cm.key, cm.value))
252+
253+
new JavaDStream[(Array[Byte], Array[Byte])](dStream)
254+
}
255+
}

external/kafka-0-9/src/main/scala/org/apache/spark/streaming/kafka09/LocationStrategy.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.annotation.Experimental
3030
* :: Experimental ::
3131
* Choice of how to schedule consumers for a given TopicPartition on an executor.
3232
* See [[LocationStrategies]] to obtain instances.
33-
* Kafka 0.10 consumers prefetch messages, so it's important for performance
33+
* Kafka 0.9 consumers prefetch messages, so it's important for performance
3434
* to keep cached consumers on appropriate executors, not recreate them for every partition.
3535
* Choice of location is only a preference, not an absolute; partitions may be scheduled elsewhere.
3636
*/

0 commit comments

Comments
 (0)