Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
55602ee
use external sort in sortBy() and sortByKey()
davies Aug 15, 2014
083d842
sorted based groupByKey()
davies Aug 16, 2014
d05060d
group the same key before shuffle, reduce the comparison during sorting
davies Aug 16, 2014
250be4e
flatten the combined values when dumping into disks
davies Aug 16, 2014
efa23df
refactor, add spark.shuffle.sort=False
davies Aug 16, 2014
b40bae7
bugfix
davies Aug 16, 2014
1ea0669
choose sort based groupByKey() automatically
davies Aug 19, 2014
3ee58e5
switch to sort based groupBy, based on size of data
davies Aug 19, 2014
085aef8
Merge branch 'master' into groupby
davies Aug 19, 2014
11ba318
typo
davies Aug 19, 2014
19f7873
improve tests
davies Aug 19, 2014
644abaf
add license in LICENSE
davies Aug 19, 2014
b2dc3bf
Merge branch 'sort' into groupby
davies Aug 20, 2014
eb53ca6
Merge branch 'master' into sort
davies Aug 20, 2014
f157fe7
Merge branch 'sort' into groupby
davies Aug 20, 2014
0a081c6
Merge branch 'master' into groupby
davies Aug 20, 2014
4b07d39
compress the data while spilling
davies Aug 20, 2014
1f075ed
Merge branch 'master' into sort
davies Aug 20, 2014
905b233
Merge branch 'sort' into groupby
davies Aug 20, 2014
acd8e1b
fix memory when groupByKey().count()
davies Aug 20, 2014
85138e6
Merge branch 'master' into groupby
davies Aug 20, 2014
b48cda5
Merge branch 'master' into groupby
davies Aug 27, 2014
2c1d05b
refactor, minor turning
davies Aug 27, 2014
779ed03
fix merge conflict
davies Aug 27, 2014
fbc504a
Merge branch 'master' into groupby
davies Sep 14, 2014
8ef965e
Merge branch 'master' into groupby
davies Sep 14, 2014
4d4bc86
bugfix
davies Sep 15, 2014
17f4ec6
Merge branch 'master' of github.com:apache/spark into groupby
davies Sep 17, 2014
6540948
address comments:
davies Sep 18, 2014
47918b8
remove unused code
davies Sep 18, 2014
341f1e0
add comments, refactor
davies Sep 18, 2014
0d3395f
Merge branch 'master' of github.com:apache/spark into groupby
davies Sep 25, 2014
1f69f93
fix tests
davies Sep 25, 2014
1578f2e
Merge branch 'master' of github.com:apache/spark into groupby
davies Oct 6, 2014
651f891
simplify GroupByKey
davies Oct 7, 2014
ab5515b
Merge branch 'master' into groupby
Oct 25, 2014
a14b4bd
Merge branch 'master' of github.com:apache/spark into groupby
Nov 4, 2014
70aadcd
Merge branch 'master' of github.com:apache/spark into groupby
Nov 10, 2014
2b9c261
fix typo in comments
Dec 30, 2014
9e2df24
Merge branch 'master' of github.com:apache/spark into groupby
Apr 6, 2015
c6a2f8d
address comments
Apr 6, 2015
d2f053b
add repr for FlattedValuesSerializer
Apr 6, 2015
2a1857a
typo
Apr 7, 2015
e3b8eab
fix narrow dependency
Apr 7, 2015
0dcf320
address comments, rollback changes in ResultIterable
Apr 8, 2015
0b0fde8
address comments
Apr 8, 2015
e78c15c
address comments
Apr 9, 2015
67772dd
fix tests
Apr 9, 2015
af3713a
make sure it's iterator
Apr 9, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions python/pyspark/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def dispatch(seq):
vbuf.append(v)
elif n == 2:
wbuf.append(v)
return [(v, w) for v in vbuf for w in wbuf]
return ((v, w) for v in vbuf for w in wbuf)
return _do_python_join(rdd, other, numPartitions, dispatch)


Expand All @@ -62,7 +62,7 @@ def dispatch(seq):
wbuf.append(v)
if not vbuf:
vbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf]
return ((v, w) for v in vbuf for w in wbuf)
return _do_python_join(rdd, other, numPartitions, dispatch)


Expand All @@ -76,7 +76,7 @@ def dispatch(seq):
wbuf.append(v)
if not wbuf:
wbuf.append(None)
return [(v, w) for v in vbuf for w in wbuf]
return ((v, w) for v in vbuf for w in wbuf)
return _do_python_join(rdd, other, numPartitions, dispatch)


Expand Down Expand Up @@ -104,8 +104,9 @@ def make_mapper(i):
rdd_len = len(vrdds)

def dispatch(seq):
bufs = [[] for i in range(rdd_len)]
for (n, v) in seq:
bufs = [[] for _ in range(rdd_len)]
for n, v in seq:
bufs[n].append(v)
return tuple(map(ResultIterable, bufs))
return tuple(ResultIterable(vs) for vs in bufs)

return union_vrdds.groupByKey(numPartitions).mapValues(dispatch)
48 changes: 36 additions & 12 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
get_used_memory, ExternalSorter
get_used_memory, ExternalSorter, ExternalGroupBy
from pyspark.traceback_utils import SCCallSiteSync

from py4j.java_collections import ListConverter, MapConverter
Expand Down Expand Up @@ -573,8 +573,8 @@ def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()

spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == 'true')
memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
spill = self._can_spill()
memory = self._memory_limit()
serializer = self._jrdd_deserializer

def sortPartition(iterator):
Expand Down Expand Up @@ -1699,10 +1699,8 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
numPartitions = self._defaultReducePartitions()

serializer = self.ctx.serializer
spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
== 'true')
memory = _parse_memory(self.ctx._conf.get(
"spark.python.worker.memory", "512m"))
spill = self._can_spill()
memory = self._memory_limit()
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)

def combineLocally(iterator):
Expand Down Expand Up @@ -1755,21 +1753,28 @@ def createZero():

return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)

def _can_spill(self):
return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == "true"

def _memory_limit(self):
return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))

# TODO: support variant with custom partitioner
def groupByKey(self, numPartitions=None):
"""
Group the values for each key in the RDD into a single sequence.
Hash-partitions the resulting RDD with into numPartitions partitions.
Hash-partitions the resulting RDD with numPartitions partitions.

Note: If you are grouping in order to perform an aggregation (such as a
sum or average) over each key, using reduceByKey or aggregateByKey will
provide much better performance.

>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
>>> sorted(x.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
>>> sorted(x.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
"""

def createCombiner(x):
return [x]

Expand All @@ -1781,8 +1786,27 @@ def mergeCombiners(a, b):
a.extend(b)
return a

return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numPartitions).mapValues(lambda x: ResultIterable(x))
spill = self._can_spill()
memory = self._memory_limit()
serializer = self._jrdd_deserializer
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)

def combine(iterator):
merger = ExternalMerger(agg, memory * 0.9, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
return merger.iteritems()

locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
shuffled = locally_combined.partitionBy(numPartitions)

def groupByKey(it):
merger = ExternalGroupBy(agg, memory, serializer)\
if spill else InMemoryMerger(agg)
merger.mergeCombiners(it)
return merger.iteritems()

return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)

def flatMapValues(self, f):
"""
Expand Down
7 changes: 4 additions & 3 deletions python/pyspark/resultiterable.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
# limitations under the License.
#

__all__ = ["ResultIterable"]

import collections

__all__ = ["ResultIterable"]


class ResultIterable(collections.Iterable):

"""
A special result iterable. This is used because the standard iterator can not be pickled
A special result iterable. This is used because the standard
iterator can not be pickled
"""

def __init__(self, data):
Expand Down
25 changes: 24 additions & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,29 @@ def __repr__(self):
return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)


class FlattenedValuesSerializer(BatchedSerializer):

"""
Serializes a stream of list of pairs, split the list of values
which contain more than a certain number of objects to make them
have similar sizes.
"""
def __init__(self, serializer, batchSize=10):
BatchedSerializer.__init__(self, serializer, batchSize)

def _batched(self, iterator):
n = self.batchSize
for key, values in iterator:
for i in xrange(0, len(values), n):
yield key, values[i:i + n]

def load_stream(self, stream):
return self.serializer.load_stream(stream)

def __repr__(self):
return "FlattenedValuesSerializer(%d)" % self.batchSize


class AutoBatchedSerializer(BatchedSerializer):
"""
Choose the size of batch automatically based on the size of object
Expand Down Expand Up @@ -251,7 +274,7 @@ def __eq__(self, other):
return (isinstance(other, AutoBatchedSerializer) and
other.serializer == self.serializer and other.bestSize == self.bestSize)

def __str__(self):
def __repr__(self):
return "AutoBatchedSerializer(%s)" % str(self.serializer)


Expand Down
Loading