@@ -43,14 +43,6 @@ def print_(self):
4343 #hack to call print function in DStream
4444 getattr (self ._jdstream , "print" )()
4545
46- def pyprint (self ):
47- """
48- Print the first ten elements of each RDD generated in this DStream. This is an output
49- operator, so this DStream will be registered as an output stream and there materialized.
50-
51- """
52- self ._jdstream .pyprint ()
53-
5446 def filter (self , f ):
5547 """
5648 Return DStream containing only the elements that satisfy predicate.
@@ -190,6 +182,38 @@ def getNumPartitions(self):
190182 # TODO: remove hardcoding. RDD has NumPartitions but DStream does not have.
191183 return 2
192184
185+ def foreachRDD (self , func ):
186+ """
187+ """
188+ from utils import RDDFunction
189+ wrapped_func = RDDFunction (self .ctx , self ._jrdd_deserializer , func )
190+ self .ctx ._jvm .PythonForeachDStream (self ._jdstream .dstream (), wrapped_func )
191+
192+ def pyprint (self ):
193+ """
194+ Print the first ten elements of each RDD generated in this DStream. This is an output
195+ operator, so this DStream will be registered as an output stream and there materialized.
196+
197+ """
198+ def takeAndPrint (rdd , time ):
199+ taken = rdd .take (11 )
200+ print "-------------------------------------------"
201+ print "Time: %s" % (str (time ))
202+ print "-------------------------------------------"
203+ for record in taken [:10 ]:
204+ print record
205+ if len (taken ) > 10 :
206+ print "..."
207+ print
208+
209+ self .foreachRDD (takeAndPrint )
210+
211+
212+ #def transform(self, func):
213+ # from utils import RDDFunction
214+ # wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
215+ # jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
216+ # return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW
193217
194218class PipelinedDStream (DStream ):
195219 def __init__ (self , prev , func , preservesPartitioning = False ):
@@ -209,7 +233,6 @@ def pipeline_func(split, iterator):
209233 self ._prev_jdstream = prev ._prev_jdstream # maintain the pipeline
210234 self ._prev_jrdd_deserializer = prev ._prev_jrdd_deserializer
211235 self .is_cached = False
212- self .is_checkpointed = False
213236 self ._ssc = prev ._ssc
214237 self .ctx = prev .ctx
215238 self .prev = prev
@@ -246,4 +269,5 @@ def _jdstream(self):
246269 return self ._jdstream_val
247270
248271 def _is_pipelinable (self ):
249- return not (self .is_cached or self .is_checkpointed )
272+ return not (self .is_cached )
273+
0 commit comments