Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Aug 3, 2020

What changes were proposed in this pull request?

This PR aims to add StorageLevel.DISK_ONLY_3 as a built-in StorageLevel.

Why are the changes needed?

In a YARN cluster, HDFS uaually provides storages with replication factor 3. So, we can save the result to HDFS to get StorageLevel.DISK_ONLY_3 technically. However, disaggregate clusters or clusters without storage services are rising. Previously, in that situation, the users were able to use similar MEMORY_AND_DISK_2 or a user-created StorageLevel. This PR aims to support those use cases officially for better UX.

Does this PR introduce any user-facing change?

Yes. This provides a new built-in option.

How was this patch tested?

Pass the GitHub Action or Jenkins with the revised test cases.

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126955 has finished for PR 29331 at commit 0cf67c4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@maropu
Copy link
Member

maropu commented Aug 3, 2020

retest this please

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126970 has finished for PR 29331 at commit 0cf67c4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Thanks, @HyukjinKwon and @maropu . Yes. The official support is more beneficial to the users. DISK_ONLY_3 is better than some magic code like new StorageLevel(true, false, false, false, 3). Also, this PR includes a test coverage for DISK_ONLY_3 which makes the customer feel safe.

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #126996 has finished for PR 29331 at commit 480a480.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Rebased to the master.

@SparkQA
Copy link

SparkQA commented Aug 3, 2020

Test build #127006 has finished for PR 29331 at commit cc1a7a3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Aug 4, 2020

Interesting. Since the last commit only changes R/Python/Doc and one Java file, CachedTableSuite should not be affected . I'll take a look what is the different from last run.

Locally, CachedTableSuite passed. I'm still looking at Jenkins.

@dongjoon-hyun
Copy link
Member Author

I found the root cause of random failures in the master branch. Here is the PR.

@dongjoon-hyun
Copy link
Member Author

Retest this please

@SparkQA
Copy link

SparkQA commented Aug 4, 2020

Test build #127054 has finished for PR 29331 at commit cc1a7a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

All test passed again. Could you review this, @HyukjinKwon , @maropu , @viirya , @dbtsai , @holdenk ?

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the value of 3x replication for persistent data; this is in theory persistence for data that is already recreateable right? cached data? or am I totally forgetting where else this can be used?

If so this doesn't seem as necessary, and even DISK_ONLY_2 feels like overkill.
I suppose one argument we've made in the past is that the 2x replication is to make the cached data available as local data in more places, to improve locality. That could be an argument.

I don't feel strongly about it either way. But would MEMORY_AND_DISK_3 then make sense?

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Aug 5, 2020

I get the value of 3x replication for persistent data; this is in theory persistence for data that is already recreateable right?

Right, @srowen .

cached data? or am I totally forgetting where else this can be used?

Yes. This cuts the lineage and works like HDFS replacement. Previously, this can be achieve when you store the RDD into HDFS back. For now, it's difficult in the disaggregated cluster.

If so this doesn't seem as necessary, and even DISK_ONLY_2 feels like overkill.

It's not overkill. HDFS replication is not only for reliability. 3x HDFS replication improves the throughput 3x times. Are you sure one executor can serve that traffic, @srowen ?

I suppose one argument we've made in the past is that the 2x replication is to make the cached data available as local data in more places, to improve locality. That could be an argument.

Improving locality is just a small fraction. The throughput improvement and reduced FetchFailedException is the real benefit. If we don't have HDFS, this is the only viable option.

But would MEMORY_AND_DISK_3 then make sense?

MEMORY_AND_DISK_3 is not recommended here because it has another assumption to have all the data into the memory. It turned out that has a severe side effect when the memory is not enough on the executors. Why do we need to load the data into the memory if it goes down disk back due to Spark operation.

This PR aims to serving the data like HDFS conceptually inside Spark to support HDFS-service-free ecosystem.

@holdenk
Copy link
Contributor

holdenk commented Aug 5, 2020

I think here the motivation was to try and deal with a workload with a lot of failures on the executors and avoiding a lot of recomputes more than the locality.

@dongjoon-hyun
Copy link
Member Author

Thank you, @holdenk !

@Ngone51
Copy link
Member

Ngone51 commented Aug 6, 2020

The throughput improvement and reduced FetchFailedException is the real benefit. If we don't have HDFS, this is the only viable option.

IIUC, FetchFailedException only raised when we try to fetch shuffle blocks, while StorageLevel is only related to RDD blocks. So I have no idea how DISK_ONLY_3 could help reduce FetchFailedException.

And how do we get the throughput improvement by using DISK_ONLY_3? Higher task parallelism? Or something else?

I think here the motivation was to try and deal with a workload with a lot of failures on the executors and avoiding a lot of recomputes more than the locality.

If that's the case, I think we should care more about shuffle data, which only has one single copy in the disk. And shuffle data loss would lead to stage recompute, which is more terrible compares to task recompute caused by RDD block loss.

I don't object to the change here, but just want to figure out what's the real case it tires to improve.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Aug 6, 2020

Thank you, @Ngone51 . The user scenario looks like this. The job has a very long lineage. In a disaggregated cluster, the executor dies sometime due to various reasons (including maintenance and preemption) and causes bad effects like FetchFailedException and frequently retries (not only the direct parent, but also the ancestor, too). The is the same as you wrote. So, the user is trying to cut the lineage by using cache after the shuffle stage. But, it turns out that cache can cause memory competition as a side-effects. Although Spark can spill the disk, they don't want to load the data into the memory from the beginning. They inevitabliy decided to choose DISK only. In short, they are using DISK_ONLY_1 and DISK_ONLY_2 and currently asking DISK_ONLY_3. It depends on the their decision on the individual dataset.

The rational of DISK_ONLY_3 is they want to have the same concept of the existing HDFS service.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Aug 7, 2020

Hi, @HyukjinKwon , @maropu , @srowen , @Ngone51 .
Please let me know if you guys have any other concerns. This is just a new alias which doesn't cause any negative effects on the existing Spark eco-system.

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Aug 7, 2020

Test build #127208 has finished for PR 29331 at commit cc1a7a3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

cc @tgravescs too

@dongjoon-hyun
Copy link
Member Author

Thank you, @HyukjinKwon .

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Aug 9, 2020

Test build #127242 has finished for PR 29331 at commit cc1a7a3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please.

@SparkQA
Copy link

SparkQA commented Aug 10, 2020

Test build #127244 has finished for PR 29331 at commit cc1a7a3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member Author

Retest this please

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your explanation @dongjoon-hyun . LGTM.

@dongjoon-hyun
Copy link
Member Author

Thank you so much, @Ngone51

@SparkQA
Copy link

SparkQA commented Aug 10, 2020

Test build #127257 has finished for PR 29331 at commit cc1a7a3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

Do we consider writing out the DataFrame as parquet/orc/... to reliable storage to cut the RDD lineage?

@dongjoon-hyun
Copy link
Member Author

Thank you for review, @cloud-fan . No, this approach doesn't not consider additional reliable storage. This PR depends on pure Spark's features only.

Do we consider writing out the DataFrame as parquet/orc/... to reliable storage to cut the RDD lineage?

@cloud-fan
Copy link
Contributor

Since we already have DISK_ONLY_2, I'm fine adding DISK_ONLY_3.

I'm just giving a different proposal for this use case. The RDD lineage model relies on recomputing so that Spark can cache data on unreliable storage. I think caching with multiple copies is diverging from the original idea. If you don't want to trigger recomputing, you can save data to reliable storage, which is usually better than 3 hard copies (object store is cheaper, HDFS has Erasure Coding to save space).

@dongjoon-hyun
Copy link
Member Author

Thank you for advice, @cloud-fan . Of course, we know that the architectural goal of Apache Spark is not aiming a reliable storage system. reliable storage will be always considered as the first solution if it's available.

@dongjoon-hyun
Copy link
Member Author

Thank you all. This is nothing new, but an alias of the existing Spark feature. This will not make much confusions what Apache Spark aims. Merged to master.

@dongjoon-hyun dongjoon-hyun deleted the SPARK-32517 branch August 10, 2020 14:33
Copy link
Contributor

@tgravescs tgravescs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for coming in late here, few questions/nits

**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.*
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2`, and `DISK_ONLY_3`.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we need to update the table above as well?
it might be nice to say what happens if you specify a level > 1 but you don't have that many executors.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me rephrase the request.

  1. Adding both DISK_ONLY_2 and DISK_ONLY_3 to the above table.
  2. Adding a description about the corner case for MEMORY_ONLY_2, MEMORY_AND_DISK_2, DISK_ONLY_2, DISK_ONLY_3

Is there something more I can do, @tgravescs ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats it.

"caching in memory, serialized, replicated" -> StorageLevel.MEMORY_ONLY_SER_2,
"caching on disk, replicated" -> StorageLevel.DISK_ONLY_2,
"caching on disk, replicated 2" -> StorageLevel.DISK_ONLY_2,
"caching on disk, replicated 3" -> StorageLevel.DISK_ONLY_3,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what happen if there aren't 3 executors? do we have a test that needs updating?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what happen if there aren't 3 executors?

The number of copies becomes 2 and this test case fail reasonably.

do we have a test that needs updating?

Yes. This test suite is updated at line 41.

holdenk pushed a commit to holdenk/spark that referenced this pull request Oct 27, 2020
This PR aims to add `StorageLevel.DISK_ONLY_3` as a built-in `StorageLevel`.

In a YARN cluster, HDFS uaually provides storages with replication factor 3. So, we can save the result to HDFS to get `StorageLevel.DISK_ONLY_3` technically. However, disaggregate clusters or clusters without storage services are rising. Previously, in that situation, the users were able to use similar `MEMORY_AND_DISK_2` or a user-created `StorageLevel`. This PR aims to support those use cases officially for better UX.

Yes. This provides a new built-in option.

Pass the GitHub Action or Jenkins with the revised test cases.

Closes apache#29331 from dongjoon-hyun/SPARK-32517.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b421bf0)
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants