Skip to content

Conversation

@kamalbanga
Copy link

@kamalbanga kamalbanga commented Aug 14, 2019

No description provided.

@HyukjinKwon
Copy link
Member

Can you show the benchmark? Here's a critical path and performance is prioritized. Also please file a JIRA - see https://spark.apache.org/contributing.html

@dongjoon-hyun dongjoon-hyun changed the title Simpler countByValue using collections' Counter [PYSPARK] Simpler countByValue using collections' Counter Aug 14, 2019
@srowen
Copy link
Member

srowen commented Aug 15, 2019

This also needs a JIRA. #25429

@SparkQA
Copy link

SparkQA commented Aug 15, 2019

Test build #4830 has finished for PR 25449 at commit 08ea5a0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kamalbanga
Copy link
Author

kamalbanga commented Aug 16, 2019

I benchmarked it and the existing implementation is faster 🤦‍♂

from pyspark import SparkContext, SparkConf
from random import choices
from collections import defaultdict, Counter
from contextlib import contextmanager
from operator import add
import time

MAX_NUM = int(1e9)
RDD_SIZE = int(1e7)
NUM_PARTITIONS = 4

@contextmanager
def timethis(snippet):
    start = time.time()
    yield
    print(f'Duration of {snippet}: {(time.time() - start):.1f} seconds')

def countval1(iterator):
    yield Counter(iterator)

def countval2(iterator):
    counts = defaultdict(int)
    for k in iterator:
        counts[k] += 1
    yield counts

def mergeMaps(m1, m2):
    for k, v in m1.items():
        m2[k] += v
    return m2


random_integers = choices(population=range(MAX_NUM), k=RDD_SIZE)

sc = SparkContext(conf=SparkConf().setAppName('Benchmark'))
random_rdd = sc.parallelize(random_integers, NUM_PARTITIONS)

with timethis('Spark Counter'):
    agg1 = random_rdd.mapPartitions(countval1).reduce(add)

with timethis('Spark defaultdict'):
    agg2 = random_rdd.mapPartitions(countval2).reduce(mergeMaps)

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK thanks for checking. Sounds like we shouldn't make this change.
If you're into optimizing this... I wonder if it helps to check whether m1 or m2 is larger in mergeMaps, and iterate over the smaller one only? If that's better you can reopen with that change.

@srowen srowen closed this Aug 16, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants