1818package org .apache .spark .streaming .api .python
1919
2020import java .io ._
21- import java .io .{ObjectInputStream , IOException }
22- import java .util .{List => JList , ArrayList => JArrayList , Map => JMap , Collections }
21+ import java .util .{List => JList , ArrayList => JArrayList , Map => JMap }
2322
24- import scala .collection .mutable .ArrayBuffer
2523import scala .reflect .ClassTag
2624import scala .collection .JavaConversions ._
2725
@@ -56,7 +54,9 @@ class PythonDStream[T: ClassTag](
5654 override def compute (validTime : Time ): Option [RDD [Array [Byte ]]] = {
5755 parent.getOrCompute(validTime) match {
5856 case Some (rdd) =>
59- val pythonRDD = new PythonRDD (rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
57+ // create PythonRDD to compute Python functions.
58+ val pythonRDD = new PythonRDD (rdd, command, envVars, pythonIncludes,
59+ preservePartitoning, pythonExec, broadcastVars, accumulator)
6060 Some (pythonRDD.asJavaRDD.rdd)
6161 case None => None
6262 }
@@ -81,8 +81,8 @@ DStream[Array[Byte]](prev.ssc){
8181 case Some (rdd)=> Some (rdd)
8282 val pairwiseRDD = new PairwiseRDD (rdd)
8383 /*
84- * Since python operation is executed by Scala after StreamingContext.start.
85- * What PythonPairwiseDStream does is equivalent to python code in pySpark .
84+ * Since python function is executed by Scala after StreamingContext.start.
85+ * What PythonPairwiseDStream does is equivalent to python code in pyspark .
8686 *
8787 * with _JavaStackTrace(self.context) as st:
8888 * pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
@@ -99,6 +99,7 @@ DStream[Array[Byte]](prev.ssc){
9999 val asJavaDStream = JavaDStream .fromDStream(this )
100100}
101101
102+
102103class PythonForeachDStream (
103104 prev : DStream [Array [Byte ]],
104105 foreachFunction : PythonRDDFunction
@@ -112,29 +113,11 @@ class PythonForeachDStream(
112113 this .register()
113114}
114115
115- class PythonTransformedDStream (
116- prev : DStream [Array [Byte ]],
117- transformFunction : PythonRDDFunction
118- ) extends DStream [Array [Byte ]](prev.ssc) {
119-
120- override def dependencies = List (prev)
121-
122- override def slideDuration : Duration = prev.slideDuration
123-
124- override def compute (validTime : Time ): Option [RDD [Array [Byte ]]] = {
125- prev.getOrCompute(validTime).map(rdd => {
126- transformFunction.call(rdd.toJavaRDD(), validTime.milliseconds).rdd
127- })
128- }
129-
130- val asJavaDStream = JavaDStream .fromDStream(this )
131- }
132116
133117/**
134118 * This is a input stream just for the unitest. This is equivalent to a checkpointable,
135119 * replayable, reliable message queue like Kafka. It requires a sequence as input, and
136120 * returns the i_th element at the i_th batch under manual clock.
137- * This implementation is inspired by QueStream
138121 */
139122
140123class PythonTestInputStream (ssc_ : JavaStreamingContext , inputRDDs : JArrayList [JavaRDD [Array [Byte ]]])
0 commit comments