@@ -286,13 +286,11 @@ def transform(self, func):
286286 `func` can have one argument of `rdd`, or have two arguments of
287287 (`time`, `rdd`)
288288 """
289- resue = False
290289 if func .func_code .co_argcount == 1 :
291- reuse = True
292290 oldfunc = func
293291 func = lambda t , rdd : oldfunc (rdd )
294292 assert func .func_code .co_argcount == 2 , "func should take one or two arguments"
295- return TransformedDStream (self , func , reuse )
293+ return TransformedDStream (self , func )
296294
297295 def transformWith (self , func , other , keepSerializer = False ):
298296 """
@@ -597,34 +595,30 @@ class TransformedDStream(DStream):
597595 Multiple continuous transformations of DStream can be combined into
598596 one transformation.
599597 """
600- def __init__ (self , prev , func , reuse = False ):
598+ def __init__ (self , prev , func ):
601599 ssc = prev ._ssc
602600 self ._ssc = ssc
603601 self .ctx = ssc ._sc
604602 self ._jrdd_deserializer = self .ctx .serializer
605603 self .is_cached = False
606604 self .is_checkpointed = False
605+ self ._jdstream_val = None
607606
608607 if (isinstance (prev , TransformedDStream ) and
609608 not prev .is_cached and not prev .is_checkpointed ):
610609 prev_func = prev .func
611- old_func = func
612- func = lambda t , rdd : old_func (t , prev_func (t , rdd ))
613- reuse = reuse and prev .reuse
614- prev = prev .prev
615-
616- self .prev = prev
617- self .func = func
618- self .reuse = reuse
619- self ._jdstream_val = None
610+ self .func = lambda t , rdd : func (t , prev_func (t , rdd ))
611+ self .prev = prev .prev
612+ else :
613+ self .prev = prev
614+ self .func = func
620615
621616 @property
622617 def _jdstream (self ):
623618 if self ._jdstream_val is not None :
624619 return self ._jdstream_val
625620
626621 jfunc = TransformFunction (self .ctx , self .func , self .prev ._jrdd_deserializer )
627- jdstream = self .ctx ._jvm .PythonTransformedDStream (self .prev ._jdstream .dstream (),
628- jfunc , self .reuse ).asJavaDStream ()
629- self ._jdstream_val = jdstream
630- return jdstream
622+ dstream = self .ctx ._jvm .PythonTransformedDStream (self .prev ._jdstream .dstream (), jfunc )
623+ self ._jdstream_val = dstream .asJavaDStream ()
624+ return self ._jdstream_val
0 commit comments