@@ -22,13 +22,15 @@ def count(self):
2222 """
2323
2424 """
25- #TODO make sure count implementation, thiis different from what pyspark does
26- return self ._mapPartitions (lambda i : [sum (1 for _ in i )]).map (lambda x : (None , 1 ))
25+ pass
26+ #TODO: make sure count implementation, thiis different from what pyspark does
27+ #return self._mapPartitions(lambda i: [sum(1 for _ in i)]).map(lambda x: (None, 1))
2728
2829 def _sum (self ):
2930 """
3031 """
31- return self ._mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
32+ pass
33+ #return self._mapPartitions(lambda x: [sum(x)]).reduce(operator.add)
3234
3335 def print_ (self ):
3436 """
@@ -85,7 +87,6 @@ def _mapPartitionsWithIndex(self, f, preservesPartitioning=False):
8587 """
8688 return PipelinedDStream (self , f , preservesPartitioning )
8789
88-
8990 def reduceByKey (self , func , numPartitions = None ):
9091 """
9192 Merge the value for each key using an associative reduce function.
@@ -121,7 +122,7 @@ def combineLocally(iterator):
121122 else :
122123 combiners [k ] = mergeValue (combiners [k ], v )
123124 return combiners .iteritems ()
124- locally_combined = self .mapPartitions (combineLocally )
125+ locally_combined = self ._mapPartitions (combineLocally )
125126 shuffled = locally_combined .partitionBy (numPartitions )
126127 def _mergeCombiners (iterator ):
127128 combiners = {}
@@ -131,12 +132,11 @@ def _mergeCombiners(iterator):
131132 else :
132133 combiners [k ] = mergeCombiners (combiners [k ], v )
133134 return combiners .iteritems ()
134- return shuffled .mapPartitions (_mergeCombiners )
135+ return shuffled ._mapPartitions (_mergeCombiners )
135136
136137 def partitionBy (self , numPartitions , partitionFunc = None ):
137138 """
138139 Return a copy of the DStream partitioned using the specified partitioner.
139-
140140 """
141141 if numPartitions is None :
142142 numPartitions = self .ctx ._defaultReducePartitions ()
0 commit comments