Skip to content

Conversation

@vladii
Copy link

@vladii vladii commented Jul 15, 2015

Disable partial aggregation automatically when reduction factor is low. Once we see enough number of rows in partial aggregation and don't observe any reduction, Aggregator should just turn off partial aggregation. This reduces memory usage for high cardinality aggregations.
This one is for Spark core.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace println with logInfo

@mccheah
Copy link

mccheah commented Jul 15, 2015

I noticed in HashShuffleReader, this block of code in .read(), which is reading on the reduce side:

val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
  if (dep.mapSideCombine) {
    // We are reading values that are already combined
    val combinedKeyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, C)]]
    dep.aggregator.get.combineCombinersByKey(combinedKeyValuesIterator, context)
  } else {
    // We don't know the value type, but also don't care -- the dependency *should*
    // have made sure its compatible w/ this aggregator, which will convert the value
    // type to the combined type C
    val keyValuesIterator = interruptibleIter.asInstanceOf[Iterator[(K, Nothing)]]
    dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
  }

How is this being handled because we're turning off map-side-combine even though dep.mapSideCombine is true?

@mccheah
Copy link

mccheah commented Jul 15, 2015

We should write a unit test at the RDD level as well. Use RDD.combineByKey() and verify the results from end-to-end testing are correct, given the partial aggregation logic added here. I think you can just add another test in RDDSuite.scala.

@vladii
Copy link
Author

vladii commented Jul 15, 2015

Regarding your first comment, I THINK the answer is:

  • dep.mapSideCombine is true and we do the aggregation - it's ok.
  • dep.mapSideCombine is false, obviously we don't do the aggregation - it's ok.
  • dep.mapSideCombine is true and we DON'T do the aggregation. Our data has (K, V) type, but it leaves to the reducer with (K, C) type (V is incapsulated in C), exactly like when we do the aggregation, but it has only 1 element. Probably a little CPU less-efficient :-). What do you think?!?!?!

About the second one: I'll do that soon!

@mccheah
Copy link

mccheah commented Jul 15, 2015

How is it getting to the reducer with (K,C) type is my question. If it gets there as (K,C) however it's definitely okay. I just thought map-side-combine is responsible for also creating the initial Cs, but I could see how the map side would do that without map-side-combine also - just want some locations to look at to know for sure.

@vladii
Copy link
Author

vladii commented Jul 15, 2015

ShuffleManager:
/**

  • Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
  • Called on executors by reduce tasks.
    */
    def getReader[K, C](handle: ShuffleHandle,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext): ShuffleReader[K, C]

ShuffleReader:
/** Read the combined key-values for this reduce task */
def read(): Iterator[Product2[K, C]]

mccheah and others added 3 commits August 6, 2015 11:11
The first iteration of the disable-map-side-combine-on-pre-aggregation
feature failed to take into consideration that the reduce side would
expect to read values of type [K, C] if mapSideCombine is set to true.
This patch makes SortFileWriter and HashFileWriter handle that case by
explicitly applying the user's createCombiner function to all rows if
they requeted mapSideCombine but we determined it was not the best
option in this stage.
@vladii
Copy link
Author

vladii commented Aug 7, 2015

Some perf testing

Cluster resources:
4 workers (8 cores each) => 32 cores
16 GB memory per worker

Spark Command:

./bin/spark-shell --executor-memory 16G --master spark://master:7077 --conf spark.partialAgg.interval=YY spark.partialAgg.reduction=XX-i MyTest.txt 

MyTest.txt code:

    val cc = sc.textFile("hdfs://file-200GB")
    val headerAndRows = cc.map(line => (line.split("\\|")))
    val pairs = headerAndRows.map(line => (line(0), 1))
    val combined = pairs.combineByKey((value:Int) => (value, 1), (x:(Int,Int), value) => (x._1 + value, x._2 + 1), (x:(Int, Int), y:(Int, Int)) => (x._1 + y._1, x._2 + y._2))
    combined.count

Result of the above code: Long = 30000

  1. Test 1
    -- spark.partialAgg.interval = 10000
    -- spark.partialAgg.reduction = 0.5
    -- Enable Pre-Aggregation: true (~1175 calls to write() from SortShuffleWriter)

Time: 6.5 min

  1. Test 2
    -- spark.partialAgg.interval = 10000
    -- spark.partialAgg.reduction = 0.0
    -- Enable Pre-aggregation: false (~1133 calls to write() from SortShuffleWriter)

Time: 8.3 min

  1. Test 3
    -- spark.partialAgg.interval = 2
    -- spark.partialAgg.reduction = 0.0
    -- Enable Pre-Aggregation: false (~1150 calls to write() from SortShuffleWriter)

Time: 8.4 min

  1. Test 4
    -- spark.partialAgg.interval = 2
    -- spark.partialAgg.reduction = 1.0
    -- Enable Pre-Aggregation: true (~1186 calls to write() from SortShuffleWriter)

Time: 6.3 min

@mccheah
Copy link

mccheah commented Aug 7, 2015

Looks like testing at a glance shows some speedup =)

So we have a dataset with 30,000 unique keys, that's what this test shows us, right? How does this compare to the total number of rows in the dataset?

Also, what are the implications of the numbers you have found? Can you explain how they line up with the behavior you're expecting?

@vladii
Copy link
Author

vladii commented Aug 7, 2015

Yep, there is a speed-up between doing pre-aggregation and not, mainly because there are 30.000 unique keys and over 700 000 000 rows in the test file.
Why did I chose those numbers in my tests? First of all, I wanted to show that there isn't a significant overhead (CPU performance) in the "deciding if to do the pre-aggregation or not" phase (and this is reflected in Test 1) and 4), where Pre-Aggregation is true, but in one case there are stored in a Map 10.000 elements, while in the other only 2 (which is negligible)). The same for Tests 2) and 3). On the other hand, the "memory" overhead also is small (and will produce actually a significant positive boost when the number of unique keys will be similar to the number of rows).

@mccheah
Copy link

mccheah commented Aug 7, 2015

" (~1133 calls to write() from SortShuffleWriter)"

How is this influenced by the pre-aggregation logic?

@vladii
Copy link
Author

vladii commented Aug 7, 2015

It isn't influenced by the pre-aggregation logic, I've posted that to see for how many times ShuffleAggregationManager class is instantiated and the pre-aggregation logic is applied (iterating through the Map etc.).

@mccheah
Copy link

mccheah commented Aug 7, 2015

I thought this would be influenced depending on how much partial aggregation is done in the map side. For example, if you stop partial aggregation partway through, you have more unique keys and values to write to the SortShuffleWriter, right?

This is more just for understanding what the end behavior actually was.

@vladii
Copy link
Author

vladii commented Aug 7, 2015

Hmm, actually no. SortShuffleWriter is called with an Iterator to some (K, V) pairs and there it decides if to do the pre-aggregation or not. To decide if the pre-aggregation is done or not, it instantiated ShuffleAggregationManager and iterates through the first spark.partialAll.interval elements of that iterator (and this is why the number of calls to write() function is important - because it allocates a Map and iterates through it every time this happens).
What I wanted to prove is that there isn't a significant overhead (both in CPU and memory) when applying our logic. The original issue description (from Spark JIRA) is: "Once we see enough number of rows in partial aggregation and don't observe any reduction, Aggregator should just turn off partial aggregation. This reduces memory usage for high cardinality aggregations.". The memory usage was the main problem, but in most of the times decreasing the amount of used memory comes with a cost (CPU or others) -- those tests try to show that here is no significant overhead.
Maybe I should do another test with 100 000 000 rows and 100 000 000 unique keys and repeat those 4 test cases, but I think the results (at least in time spent on executing the tasks) will be the same.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should remove this. I don't see this header on any other files in Spark.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, sorry! :-D

@mccheah
Copy link

mccheah commented Aug 11, 2015

CC @punya for a final sign-off / review. Note that this is basically just adapting this thing: apache#1191

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noob question: why Product2[K, V] rather than (K, V)?

@vladii
Copy link
Author

vladii commented Aug 12, 2015

I've tested the code against a LZO compressed table (with the same Spark code which I wrote in a previous comment here) with 1.000.000.000 rows and 1.000.000.000 unique keys. The table looks like this:

key1|vlad1
key2|vlad2
key3|vlad3
...

It seems that there is a high performance improvement when disabling pre-aggregation (which our code does). Here are the results:

  1. Test 1
    -- spark.partialAgg.reduction = 0.0
    -- spark.partialAgg.interval = 10000
    Enable Pre-Aggregation: false (~47 calls to write).
    Time: 7.9 min

  2. Test 2
    -- spark.partialAgg.reduction = 1.0
    -- spark.partialAgg.interval = 10000
    Enable Pre-Aggregation: true (~49 calls to write).
    Time: 13.1 min

  3. Test 3
    -- spark.partialAgg.reduction = 1.0
    -- spark.partialAgg.interval = 2
    Enable Pre-Aggregation: true (~49 calls to write).
    Time: 13 min

Conclusion: when pre-aggregation is disabled we obtain 8 minutes instead of 13. Also spark.partialAgg.interval doesn't affect the time very much (Test 2 has a big interval and Test 3 a small one and there isn't a significant difference between them).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iteratedElements could potential oom if we storing in memory 10k large items. Perhaps we should store it in a size tracking collection, and stop sampling when either we hit 10k items, or if the size tracking collection gets too big?

Talking to @mccheah, the other way is to do it "inline", you can talk to him directly if you want some insight about that.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inline logic is what the original commit did, but that's pretty hard to do. I like the size tracking idea, but size tracking in and of itself isn't free so you should benchmark that. Size tracking is most expensive when you have an RDD of composite objects (i.e. not primitives, think like an RDD of HashSet objects)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! Yep, we were aware of that and I think there are 2 possible solutions to solve that:

  1. I think I have a mathematical algorithm (with probabilities, which assumes that keys are uniformly distributed) which is able to stop iterating after a number of steps (in this case, MAXIMUM 10k). I have to think a little bit more (actually, to remember it :-) ) and I'll post it here.
  2. Instead of using a MutableList, maybe we can switch to ExternalAppendOnlyMap or ExternalList (which Matt created in some of his previous commits).
    I'll be thinking about that :-)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should just use a size tracking collection. The collection doesn't have to spill, significantly simplifying your implementation. The size tracking collection will be able to report its size in memory, and then when it hits some memory threshold you take that as the sample to conduct your heuristic.

@vladii
Copy link
Author

vladii commented Aug 27, 2015

Thank you for your comment, Justin! Do you mean to implement a mechanism like in Spillable class (set the initial memory threshold based on spark.shuffle.spill.initialMemoryThreshold and then tryToAcquire() more memory until it reaches myMemoryThreshold)? Or do you think it's ok to do what I've just done in my last commit...? :-)

@vladii vladii closed this Aug 27, 2015
@vladii vladii reopened this Aug 27, 2015
@justinuang
Copy link

Np! I think it's okay to just set it to just be as large as initialMemoryThreshold, and then stop if it ever gets that big. This is mainly a safety mechanism, so we don't need to be very smart about how we acquire more! Haha, i think being lazy might be warranted here.

@vladii
Copy link
Author

vladii commented Aug 27, 2015

Ok! Thank you for your time! (thumbsup)

@vladii
Copy link
Author

vladii commented Aug 28, 2015

This is what SizeTrackingVector actually does - computing once at each N elements added and estimating the size after each addition.

@mccheah
Copy link

mccheah commented Aug 28, 2015

Would be good to get some performance numbers on this, can you do that?

@vladii
Copy link
Author

vladii commented Aug 28, 2015

Yes, I can.

@vladii
Copy link
Author

vladii commented Aug 28, 2015

I've summarily tested this, it seems there isn't a big perf difference between the method with SizeTracking collection and the older one. I've tested for a file with 10.000.000 rows, each row containing a key - value pair (both strings). Both methods ran locally in about 28 seconds.
I am aware that this isn't the most accurate testing, but I don't have time for more :-(

mccheah pushed a commit that referenced this pull request Apr 26, 2016
## What changes were proposed in this pull request?

This PR brings the support for chained Python UDFs, for example

```sql
select udf1(udf2(a))
select udf1(udf2(a) + 3)
select udf1(udf2(a) + udf3(b))
```

Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches.

For example,
```python
>>> sqlContext.sql("select double(double(1))").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [pythonUDF#10 AS double(double(1))#9]
:     +- INPUT
+- !BatchPythonEvaluation double(double(1)), [pythonUDF#10]
   +- Scan OneRowRelation[]
>>> sqlContext.sql("select double(double(1) + double(2))").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16]
:     +- INPUT
+- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19]
   +- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18]
      +- !BatchPythonEvaluation double(1), [pythonUDF#17]
         +- Scan OneRowRelation[]
```

TODO: will support multiple unrelated Python UDFs in one batch (another PR).

## How was this patch tested?

Added new unit tests for chained UDFs.

Author: Davies Liu <davies@databricks.com>

Closes apache#12014 from davies/py_udfs.
@robert3005 robert3005 closed this Sep 24, 2016
@robert3005 robert3005 deleted the feature/vladio branch September 24, 2016 04:08
ash211 pushed a commit that referenced this pull request Feb 16, 2017
* Documentation for the current state of the world.

* Adding navigation links from other pages

* Address comments, add TODO for things that should be fixed

* Address comments, mostly making images section clearer

* Virtual runtime -> container runtime
mccheah added a commit that referenced this pull request Apr 27, 2017
* Documentation for the current state of the world.

* Adding navigation links from other pages

* Address comments, add TODO for things that should be fixed

* Address comments, mostly making images section clearer

* Virtual runtime -> container runtime
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants