Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Oct 9, 2024

Which issue does this PR close?

Closes #1006

Rationale for this change

Improved performance

What changes are included in this PR?

  • Add new config option to replace SMJ with SHJ

How are these changes tested?

I manually ran TPC-H and saw improved performance. I will post benchmarks once I have run more tests.

@andygrove andygrove marked this pull request as draft October 9, 2024 18:01
CometFilter [ca_address_sk,ca_state]
CometScan parquet spark_catalog.default.customer_address [ca_address_sk,ca_state]
InputAdapter
BroadcastExchange #7
Copy link
Member Author

@andygrove andygrove Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a regression that I am looking into (falling back to Spark for BroadcastHashJoin)

@andygrove
Copy link
Member Author

andygrove commented Oct 9, 2024

Here is a teaser for the performance improvement. This is for TPC-H q11 (SF=100) with broadcast joins disabled (I am looking into a regression with those). I ran the query 5 times each with rule enabled vs disabled.

Rule Off

79.87537693977356,
77.76734256744385,
75.35734295845032,
75.44863200187683,
72.88174152374268

Rule On

39.33945274353027,
36.159271240234375,
35.83299708366394,
35.638232707977295,
35.67777371406555

@parthchandra
Copy link
Contributor

There is a small danger in enabling this without having a good estimate of the size of the build side. ShuffleHashJoin has limits on how much data it can process efficiently. If the build side hash table has no spilling then a large enough build side will cause OOMs and if there is spilling, then SMJ can frequently lead to better performance. We might even see this when we scale the benchmark from SF1 to say SF10.
Is there a way for us to get cardinality and row size for the build side somehow?
Still worth adding this option though.

@parthchandra
Copy link
Contributor

if there is spilling, then SMJ can frequently lead to better performance
I have seen this happen with Spark with some TPC-DS queries at SF10.

conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin")
.doc("Whether to replace SortMergeJoin with ShuffledHashJoin for improved performance.")
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a default value as false for stablility. Spark decides to use SMJ for some reasons including data statistics. If Spark thinks SHJ may not work, I think we better follow it except for explicitly asking by users.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other accelerators (Spark RAPIDS and Gluten) default this to true. Perhaps we should benchmark at large scale factors before and see if we run into any issues?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is okay for the benchmark datasets like TPCDS or TPCH. The cases I worry about is the production ones. But it might be more internal cases.

For OSS, maybe enabling it by default is okay.

Copy link
Member

@viirya viirya Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, we should add some more descriptions here to mention the risk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should benchmark at large scale factors before and see if we run into any issues?

Agreed. (Also, when I wrote SF1 and SF10 I meant 1TB, and 10TB which is really SF 1000 and SF 10000).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this PR, I disabled the feature by default. I created the following PR to enable it by default and update the tests. I will add documentation as part of this PR.

#1008

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a new follow on issue for enabling by default:

#1011

@andygrove
Copy link
Member Author

Current benchmarks:

tpch_allqueries

Speedup of using HashJoin instead of SortMergeJoin:

tpch_queries_speedup

@codecov-commenter
Copy link

codecov-commenter commented Oct 9, 2024

Codecov Report

❌ Patch coverage is 19.44444% with 29 lines in your changes missing coverage. Please review.
✅ Project coverage is 34.27%. Comparing base (e3ac6cf) to head (1073517).
⚠️ Report is 748 commits behind head on main.

Files with missing lines Patch % Lines
...ain/scala/org/apache/comet/rules/RewriteJoin.scala 0.00% 26 Missing ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 40.00% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1007      +/-   ##
============================================
- Coverage     34.41%   34.27%   -0.14%     
+ Complexity      886      881       -5     
============================================
  Files           112      113       +1     
  Lines         43479    43514      +35     
  Branches       9656     9663       +7     
============================================
- Hits          14962    14914      -48     
- Misses        25442    25510      +68     
- Partials       3075     3090      +15     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove andygrove marked this pull request as ready for review October 10, 2024 03:06
@andygrove
Copy link
Member Author

I will add documentation to this PR today, explaining pros/cons of this feature in our tuning guide.


# note the use of a wildcard in the file name so that this works with both snapshot and final release versions
COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_$SCALA_VERSION-0.2.0*.jar $SPARK_HOME/jars
COPY --from=builder /comet/spark/target/comet-spark-spark${SPARK_VERSION}_$SCALA_VERSION-*.jar $SPARK_HOME/jars
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated, but ran into this hard-coded version number during testing

@andygrove
Copy link
Member Author

@viirya @parthchandra This is now ready for review. The new option is disabled by default and I added a section to the tuning guide explaining why users may want to enable this new option.

Copy link
Contributor

@parthchandra parthchandra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

@andygrove
Copy link
Member Author

I have run into a deadlock when running TPC-DS benchmarks with this feature, so I am moving to draft while I investigate. It is possibly related to the memory pool issues that we are also working on in other PRs.

@andygrove
Copy link
Member Author

andygrove commented Oct 15, 2024

After upmerging, I no longer see the deadlock, but instead get an error if I have insufficient memory allocated, which is an improvement.

org.apache.comet.CometNativeException (External error: Internal error: 

Partition is still not able to allocate enough memory for the array builders after spilling..

However, when I increase memory, I see queries fail due to #1019.

@andygrove andygrove marked this pull request as ready for review October 18, 2024 20:55
@andygrove
Copy link
Member Author

I have now marked the feature as experimental and explained in the tuning guide that there is no spill to disk so this could result in OOM.

@andygrove
Copy link
Member Author

Fresh benchmarks after upmerging.

tpch_allqueries

@andygrove
Copy link
Member Author

TPC-DS excluding q97 (OOM with ShuffledHashJoin).

tpcds_allqueries

val COMET_REPLACE_SMJ: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.replaceSortMergeJoin")
.doc("Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin " +
"for improved performance. See tuning guide for more information regarding stability of " +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a link to the tuning guide?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Updated. Thanks @jaceklaskowski

| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. See tuning guide for more information regarding stability of this feature. | false |

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a link to the tuning guide? 🙏

@andygrove andygrove changed the title perf: Add option to replace SortMergeJoin with ShuffledHashJoin perf: Add experimental feature to replace SortMergeJoin with ShuffledHashJoin Oct 21, 2024
@andygrove andygrove merged commit cb3e977 into apache:main Oct 21, 2024
@andygrove andygrove deleted the replace-smj branch October 21, 2024 19:43
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

Closes #.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

```
cb3e977 perf: Add experimental feature to replace SortMergeJoin with ShuffledHashJoin (apache#1007)
3df9d5c fix: Make comet-git-info.properties optional (apache#1027)
4033687 chore: Reserve memory for native shuffle writer per partition (apache#1022)
bd541d6 (public/main) remove hard-coded version number from Dockerfile (apache#1025)
e3ac6cf feat: Implement bloom_filter_agg (apache#987)
8d097d5 (origin/main) chore: Revert "chore: Reserve memory for native shuffle writer per partition (apache#988)" (apache#1020)
591f45a chore: Bump arrow-rs to 53.1.0 and datafusion (apache#1001)
e146cfa chore: Reserve memory for native shuffle writer per partition (apache#988)
abd9f85 fix: Fallback to Spark if named_struct contains duplicate field names (apache#1016)
22613e9 remove legacy comet-spark-shell (apache#1013)
d40c802 clarify that Maven central only has jars for Linux (apache#1009)
837c256 docs: Various documentation improvements (apache#1005)
0667c60 chore: Make parquet reader options Comet options instead of Hadoop options (apache#968)
0028f1e fix: Fallback to Spark if scan has meta columns (apache#997)
b131cc3 feat: Support `GetArrayStructFields` expression (apache#993)
3413397 docs: Update tuning guide (apache#995)
afd28b9 Quality of life fixes for easier hacking (apache#982)
18150fb chore: Don't transform the HashAggregate to CometHashAggregate if Comet shuffle is disabled (apache#991)
a1599e2 chore: Update for 0.3.0 release, prepare for 0.4.0 development (apache#970)
```

## How are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add option to replace SortMergeJoin with ShuffleHashJoin

6 participants