diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 590e8e1e9c07c..9c226281497f7 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -28,7 +28,7 @@ from subprocess import Popen, PIPE from tempfile import NamedTemporaryFile from threading import Thread -from collections import defaultdict +from collections import defaultdict, Counter from itertools import chain from functools import reduce from math import sqrt, log, isinf, isnan, pow, ceil @@ -1321,16 +1321,9 @@ def countByValue(self): [(1, 2), (2, 3)] """ def countPartition(iterator): - counts = defaultdict(int) - for obj in iterator: - counts[obj] += 1 - yield counts + yield Counter(iterator) - def mergeMaps(m1, m2): - for k, v in m2.items(): - m1[k] += v - return m1 - return self.mapPartitions(countPartition).reduce(mergeMaps) + return self.mapPartitions(countPartition).reduce(operator.add) def top(self, num, key=None): """