From 41fce544cadce5ed314b75f368abf79ee7fcd2da Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 10 Nov 2014 15:54:20 -0800 Subject: [PATCH 01/13] randomSplit() --- .../apache/spark/api/python/PythonRDD.scala | 13 +++++++++ python/pyspark/rdd.py | 28 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 45beb8fc8c925..78a5794bd557b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -757,6 +757,19 @@ private[spark] object PythonRDD extends Logging { converted.saveAsHadoopDataset(new JobConf(conf)) } } + + /** + * A helper to convert java.util.List[Double] into Array[Double] + * @param list + * @return + */ + def listToArrayDouble(list: JList[Double]): Array[Double] = { + val r = new Array[Double](list.size) + list.zipWithIndex.foreach { + case (v, i) => r(i) = v + } + r + } } private diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 08d047402625f..f29af793737f8 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -316,6 +316,34 @@ def sample(self, withReplacement, fraction, seed=None): assert fraction >= 0.0, "Negative fraction value: %s" % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) + def randomSplit(self, weights, seed=None): + """ + Randomly splits this RDD with the provided weights. + + :param weights: weights for splits, will be normalized if they don't sum to 1 + :param seed: random seed + :return: split RDDs in an list + + >>> rdd = sc.parallelize(range(10), 1) + >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11) + >>> rdd1.collect() + [3, 6] + >>> rdd2.collect() + [0, 5, 7] + >>> rdd3.collect() + [1, 2, 4, 8, 9] + """ + ser = BatchedSerializer(PickleSerializer(), 1) + rdd = self._reserialize(ser) + jweights = ListConverter().convert([float(w) for w in weights], + self.ctx._gateway._gateway_client) + jweights = self.ctx._jvm.PythonRDD.listToArrayDouble(jweights) + if seed is None: + jrdds = rdd._jrdd.randomSplit(jweights) + else: + jrdds = rdd._jrdd.randomSplit(jweights, seed) + return [RDD(jrdd, self.ctx, ser) for jrdd in jrdds] + # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): """ From 1715ee38b619868c284454d5058376e9e0ca09a7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 12 Nov 2014 14:15:47 -0800 Subject: [PATCH 02/13] address comments --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 -- python/pyspark/rdd.py | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 78a5794bd557b..a36f4a1a221c8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -760,8 +760,6 @@ private[spark] object PythonRDD extends Logging { /** * A helper to convert java.util.List[Double] into Array[Double] - * @param list - * @return */ def listToArrayDouble(list: JList[Double]): Array[Double] = { val r = new Array[Double](list.size) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index f29af793737f8..a4cf1ead5bfa9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -322,7 +322,7 @@ def randomSplit(self, weights, seed=None): :param weights: weights for splits, will be normalized if they don't sum to 1 :param seed: random seed - :return: split RDDs in an list + :return: split RDDs in a list >>> rdd = sc.parallelize(range(10), 1) >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11) From 0d9b256a9b7aa52c37c6a952ffd68bf4441d46e5 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 12 Nov 2014 15:00:17 -0800 Subject: [PATCH 03/13] refactor --- .../main/scala/org/apache/spark/api/python/PythonRDD.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index a36f4a1a221c8..6702baa9e12fb 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -24,6 +24,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import org.apache.spark.input.PortableDataStream import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials @@ -762,11 +763,7 @@ private[spark] object PythonRDD extends Logging { * A helper to convert java.util.List[Double] into Array[Double] */ def listToArrayDouble(list: JList[Double]): Array[Double] = { - val r = new Array[Double](list.size) - list.zipWithIndex.foreach { - case (v, i) => r(i) = v - } - r + list.asScala.toArray } } From c7a2007245752480dfe96316d22dc8f19ea1b1a2 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 11:29:43 -0800 Subject: [PATCH 04/13] switch to python implementation --- .../apache/spark/api/python/PythonRDD.scala | 7 ----- python/pyspark/rdd.py | 30 ++++++++----------- python/pyspark/rddsampler.py | 5 ++-- 3 files changed, 16 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 6702baa9e12fb..ffac314d50219 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -758,13 +758,6 @@ private[spark] object PythonRDD extends Logging { converted.saveAsHadoopDataset(new JobConf(conf)) } } - - /** - * A helper to convert java.util.List[Double] into Array[Double] - */ - def listToArrayDouble(list: JList[Double]): Array[Double] = { - list.asScala.toArray - } } private diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a4cf1ead5bfa9..bb03adce2463e 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -28,7 +28,7 @@ import warnings import heapq import bisect -from random import Random +import random from math import sqrt, log, isinf, isnan from pyspark.accumulators import PStatsParam @@ -324,25 +324,21 @@ def randomSplit(self, weights, seed=None): :param seed: random seed :return: split RDDs in a list - >>> rdd = sc.parallelize(range(10), 1) - >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11) + >>> rdd = sc.parallelize(range(5), 1) + >>> rdd1, rdd2 = rdd.randomSplit([2.0, 3.0], 101) >>> rdd1.collect() - [3, 6] + [2, 3] >>> rdd2.collect() - [0, 5, 7] - >>> rdd3.collect() - [1, 2, 4, 8, 9] + [0, 1, 4] """ - ser = BatchedSerializer(PickleSerializer(), 1) - rdd = self._reserialize(ser) - jweights = ListConverter().convert([float(w) for w in weights], - self.ctx._gateway._gateway_client) - jweights = self.ctx._jvm.PythonRDD.listToArrayDouble(jweights) + s = sum(weights) + cweights = [0.0] + for w in weights: + cweights.append(cweights[-1] + w / s) if seed is None: - jrdds = rdd._jrdd.randomSplit(jweights) - else: - jrdds = rdd._jrdd.randomSplit(jweights, seed) - return [RDD(jrdd, self.ctx, ser) for jrdd in jrdds] + seed = random.randint(0, 2 ** 32 - 1) + return [self.mapPartitionsWithIndex(RDDSampler(False, ub, seed, lb).func, True) + for lb, ub in zip(cweights, cweights[1:])] # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): @@ -369,7 +365,7 @@ def takeSample(self, withReplacement, num, seed=None): if initialCount == 0: return [] - rand = Random(seed) + rand = random.Random(seed) if (not withReplacement) and num >= initialCount: # shuffle current RDD and return diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index f5c3cfd259a5b..b35fb64570762 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -96,9 +96,10 @@ def shuffle(self, vals): class RDDSampler(RDDSamplerBase): - def __init__(self, withReplacement, fraction, seed=None): + def __init__(self, withReplacement, fraction, seed=None, lowbound=0.0): RDDSamplerBase.__init__(self, withReplacement, seed) self._fraction = fraction + self._lowbound = lowbound def func(self, split, iterator): if self._withReplacement: @@ -111,7 +112,7 @@ def func(self, split, iterator): yield obj else: for obj in iterator: - if self.getUniformSample(split) <= self._fraction: + if self._lowbound <= self.getUniformSample(split) < self._fraction: yield obj From f866bcf7e23a6d888d2113df4da3031bfe91400e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 11:31:09 -0800 Subject: [PATCH 05/13] remove unneeded change --- core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index ffac314d50219..45beb8fc8c925 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -24,7 +24,6 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import org.apache.spark.input.PortableDataStream import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials From 4dfa2cdce6a2eaae5f5e24321e324bd0498ea49f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 12:20:56 -0800 Subject: [PATCH 06/13] refactor --- python/pyspark/rdd.py | 4 ++-- python/pyspark/rddsampler.py | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index bb03adce2463e..4b2043ff9b465 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -38,7 +38,7 @@ from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_full_outer_join, python_cogroup from pyspark.statcounter import StatCounter -from pyspark.rddsampler import RDDSampler, RDDStratifiedSampler +from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler from pyspark.storagelevel import StorageLevel from pyspark.resultiterable import ResultIterable from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \ @@ -337,7 +337,7 @@ def randomSplit(self, weights, seed=None): cweights.append(cweights[-1] + w / s) if seed is None: seed = random.randint(0, 2 ** 32 - 1) - return [self.mapPartitionsWithIndex(RDDSampler(False, ub, seed, lb).func, True) + return [self.mapPartitionsWithIndex(RDDRangeSampler(lb, ub, seed).func, True) for lb, ub in zip(cweights, cweights[1:])] # this is ported from scala/spark/RDD.scala diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index b35fb64570762..4365640040116 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -96,10 +96,9 @@ def shuffle(self, vals): class RDDSampler(RDDSamplerBase): - def __init__(self, withReplacement, fraction, seed=None, lowbound=0.0): + def __init__(self, withReplacement, fraction, seed=None): RDDSamplerBase.__init__(self, withReplacement, seed) self._fraction = fraction - self._lowbound = lowbound def func(self, split, iterator): if self._withReplacement: @@ -112,10 +111,23 @@ def func(self, split, iterator): yield obj else: for obj in iterator: - if self._lowbound <= self.getUniformSample(split) < self._fraction: + if self.getUniformSample(split) <= self._fraction: yield obj +class RDDRangeSampler(RDDSamplerBase): + + def __init__(self, lowerBound, upperBound, seed=None): + RDDSamplerBase.__init__(self, False, seed) + self._lowerBound = lowerBound + self._upperBound = upperBound + + def func(self, split, iterator): + for obj in iterator: + if self._lowerBound <= self.getUniformSample(split) < self._upperBound: + yield obj + + class RDDStratifiedSampler(RDDSamplerBase): def __init__(self, withReplacement, fractions, seed=None): From f5fdf63fe0d0ea091c01a4144585276a7db63625 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 12:54:14 -0800 Subject: [PATCH 07/13] fix bug with int in weights --- python/pyspark/rdd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 4b2043ff9b465..0e8920281e842 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -325,13 +325,13 @@ def randomSplit(self, weights, seed=None): :return: split RDDs in a list >>> rdd = sc.parallelize(range(5), 1) - >>> rdd1, rdd2 = rdd.randomSplit([2.0, 3.0], 101) + >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 101) >>> rdd1.collect() [2, 3] >>> rdd2.collect() [0, 1, 4] """ - s = sum(weights) + s = float(sum(weights)) cweights = [0.0] for w in weights: cweights.append(cweights[-1] + w / s) From 78bf997f13c6f08129671a9d6a3484620d5b37a2 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 13:08:10 -0800 Subject: [PATCH 08/13] fix tests, do not use numpy in randomSplit, no performance gain --- python/pyspark/rdd.py | 6 +++--- python/pyspark/rddsampler.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0e8920281e842..50535d2711708 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -325,11 +325,11 @@ def randomSplit(self, weights, seed=None): :return: split RDDs in a list >>> rdd = sc.parallelize(range(5), 1) - >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 101) + >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17) >>> rdd1.collect() - [2, 3] + [1, 3] >>> rdd2.collect() - [0, 1, 4] + [0, 2, 4] """ s = float(sum(weights)) cweights = [0.0] diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 4365640040116..558dcfd12d46f 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -119,6 +119,7 @@ class RDDRangeSampler(RDDSamplerBase): def __init__(self, lowerBound, upperBound, seed=None): RDDSamplerBase.__init__(self, False, seed) + self._use_numpy = False # no performance gain from numpy self._lowerBound = lowerBound self._upperBound = upperBound From 51649f5e5b29ab8db1c6c3fd91c6f625124ab327 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 13 Nov 2014 22:40:40 -0800 Subject: [PATCH 09/13] remove numpy in RDDSampler --- python/pyspark/rdd.py | 12 ++++-- python/pyspark/rddsampler.py | 84 ++++++++++-------------------------- 2 files changed, 30 insertions(+), 66 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 50535d2711708..9e4b33a8f42ad 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -310,8 +310,13 @@ def distinct(self, numPartitions=None): def sample(self, withReplacement, fraction, seed=None): """ - Return a sampled subset of this RDD (relies on numpy and falls back - on default random generator if numpy is unavailable). + Return a sampled subset of this RDD. + + >>> rdd = sc.parallelize(range(100), 4) + >>> rdd.sample(True, 0.1, 27).count() + 10 + >>> rdd.sample(False, 0.1, 81).count() + 10 """ assert fraction >= 0.0, "Negative fraction value: %s" % fraction return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True) @@ -343,8 +348,7 @@ def randomSplit(self, weights, seed=None): # this is ported from scala/spark/RDD.scala def takeSample(self, withReplacement, num, seed=None): """ - Return a fixed-size sampled subset of this RDD (currently requires - numpy). + Return a fixed-size sampled subset of this RDD. >>> rdd = sc.parallelize(range(0, 10)) >>> len(rdd.takeSample(True, 20, 1)) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 558dcfd12d46f..fd7639ff7a138 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -22,76 +22,34 @@ class RDDSamplerBase(object): def __init__(self, withReplacement, seed=None): - try: - import numpy - self._use_numpy = True - except ImportError: - print >> sys.stderr, ( - "NumPy does not appear to be installed. " - "Falling back to default random generator for sampling.") - self._use_numpy = False - - self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1) + self._seed = seed if seed is not None else random.randint(0, sys.maxint) self._withReplacement = withReplacement self._random = None - self._split = None - self._rand_initialized = False def initRandomGenerator(self, split): - if self._use_numpy: - import numpy - self._random = numpy.random.RandomState(self._seed ^ split) - else: - self._random = random.Random(self._seed ^ split) + self._random = random.Random(self._seed ^ split) # mixing because the initial seeds are close to each other for _ in xrange(10): self._random.randint(0, 1) - self._split = split - self._rand_initialized = True - - def getUniformSample(self, split): - if not self._rand_initialized or split != self._split: - self.initRandomGenerator(split) - - if self._use_numpy: - return self._random.random_sample() - else: - return self._random.uniform(0.0, 1.0) - - def getPoissonSample(self, split, mean): - if not self._rand_initialized or split != self._split: - self.initRandomGenerator(split) + def getUniformSample(self): + return self._random.random() - if self._use_numpy: - return self._random.poisson(mean) - else: - # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by - # drawing a sequence of numbers delta_j ~ Exp(mean) - num_arrivals = 1 - cur_time = 0.0 + def getPoissonSample(self, mean): + # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by + # drawing a sequence of numbers delta_j ~ Exp(mean) + num_arrivals = 0 + cur_time = self._random.expovariate(mean) + while cur_time < 1.0: cur_time += self._random.expovariate(mean) + num_arrivals += 1 - if cur_time > 1.0: - return 0 + return num_arrivals - while(cur_time <= 1.0): - cur_time += self._random.expovariate(mean) - num_arrivals += 1 - - return (num_arrivals - 1) - - def shuffle(self, vals): - if self._random is None: - self.initRandomGenerator(0) # this should only ever called on the master so - # the split does not matter - - if self._use_numpy: - self._random.shuffle(vals) - else: - self._random.shuffle(vals, self._random.random) + def func(self, split, iterator): + raise NotImplementedError class RDDSampler(RDDSamplerBase): @@ -101,17 +59,18 @@ def __init__(self, withReplacement, fraction, seed=None): self._fraction = fraction def func(self, split, iterator): + self.initRandomGenerator(split) if self._withReplacement: for obj in iterator: # For large datasets, the expected number of occurrences of each element in # a sample with replacement is Poisson(frac). We use that to get a count for # each element. - count = self.getPoissonSample(split, mean=self._fraction) + count = self.getPoissonSample(self._fraction) for _ in range(0, count): yield obj else: for obj in iterator: - if self.getUniformSample(split) <= self._fraction: + if self.getUniformSample() <= self._fraction: yield obj @@ -119,13 +78,13 @@ class RDDRangeSampler(RDDSamplerBase): def __init__(self, lowerBound, upperBound, seed=None): RDDSamplerBase.__init__(self, False, seed) - self._use_numpy = False # no performance gain from numpy self._lowerBound = lowerBound self._upperBound = upperBound def func(self, split, iterator): + self.initRandomGenerator(split) for obj in iterator: - if self._lowerBound <= self.getUniformSample(split) < self._upperBound: + if self._lowerBound <= self.getUniformSample() < self._upperBound: yield obj @@ -136,15 +95,16 @@ def __init__(self, withReplacement, fractions, seed=None): self._fractions = fractions def func(self, split, iterator): + self.initRandomGenerator(split) if self._withReplacement: for key, val in iterator: # For large datasets, the expected number of occurrences of each element in # a sample with replacement is Poisson(frac). We use that to get a count for # each element. - count = self.getPoissonSample(split, mean=self._fractions[key]) + count = self.getPoissonSample(self._fractions[key]) for _ in range(0, count): yield key, val else: for key, val in iterator: - if self.getUniformSample(split) <= self._fractions[key]: + if self.getUniformSample() <= self._fractions[key]: yield key, val From f58302346ad0e859994fae52299fd32b3d3ef61a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 14 Nov 2014 00:37:20 -0800 Subject: [PATCH 10/13] fix tests --- python/pyspark/rdd.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 9e4b33a8f42ad..57754776faaa2 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -313,8 +313,6 @@ def sample(self, withReplacement, fraction, seed=None): Return a sampled subset of this RDD. >>> rdd = sc.parallelize(range(100), 4) - >>> rdd.sample(True, 0.1, 27).count() - 10 >>> rdd.sample(False, 0.1, 81).count() 10 """ From ee17d7846438e270967e38120d0fb6c63523defd Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 19 Nov 2014 10:58:03 -0800 Subject: [PATCH 11/13] remove = for float --- python/pyspark/rddsampler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index fd7639ff7a138..5928b1d892de0 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -70,7 +70,7 @@ def func(self, split, iterator): yield obj else: for obj in iterator: - if self.getUniformSample() <= self._fraction: + if self.getUniformSample() < self._fraction: yield obj @@ -106,5 +106,5 @@ def func(self, split, iterator): yield key, val else: for key, val in iterator: - if self.getUniformSample() <= self._fractions[key]: + if self.getUniformSample() < self._fractions[key]: yield key, val From 98eb31bafdfd63e7b727591471e5283018adb753 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 20 Nov 2014 13:38:37 -0800 Subject: [PATCH 12/13] make poisson sampling slightly faster --- python/pyspark/rddsampler.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 5928b1d892de0..9e7acc28e99dd 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -17,6 +17,7 @@ import sys import random +import math class RDDSamplerBase(object): @@ -37,16 +38,21 @@ def getUniformSample(self): return self._random.random() def getPoissonSample(self, mean): - # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by - # drawing a sequence of numbers delta_j ~ Exp(mean) - num_arrivals = 0 - cur_time = self._random.expovariate(mean) - - while cur_time < 1.0: - cur_time += self._random.expovariate(mean) - num_arrivals += 1 - - return num_arrivals + # Using Knuth's algorithm described in http://en.wikipedia.org/wiki/Poisson_distribution + if mean < 20.0: # one exp and k+1 random calls + l = math.exp(-mean) + p = self._random.random() + k = 0 + while p > l: + k += 1 + p *= self._random.random() + else: # switch to the log domain, k+1 expovariate (random + log) calls + p = self._random.expovariate(mean) + k = 0 + while p < 1.0: + k += 1 + p += self._random.expovariate(mean) + return k def func(self, split, iterator): raise NotImplementedError From 5c438d7299858afde917f47be63fec17a1d9b359 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 20 Nov 2014 14:18:22 -0800 Subject: [PATCH 13/13] fix comment --- python/pyspark/rddsampler.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py index 9e7acc28e99dd..459e1427803cb 100644 --- a/python/pyspark/rddsampler.py +++ b/python/pyspark/rddsampler.py @@ -38,15 +38,18 @@ def getUniformSample(self): return self._random.random() def getPoissonSample(self, mean): - # Using Knuth's algorithm described in http://en.wikipedia.org/wiki/Poisson_distribution - if mean < 20.0: # one exp and k+1 random calls + # Using Knuth's algorithm described in + # http://en.wikipedia.org/wiki/Poisson_distribution + if mean < 20.0: + # one exp and k+1 random calls l = math.exp(-mean) p = self._random.random() k = 0 while p > l: k += 1 p *= self._random.random() - else: # switch to the log domain, k+1 expovariate (random + log) calls + else: + # switch to the log domain, k+1 expovariate (random + log) calls p = self._random.expovariate(mean) k = 0 while p < 1.0: