-
Notifications
You must be signed in to change notification settings - Fork 265
feat: Implement bloom_filter_agg #987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…w. Added spark_bit_array_tests.
| fn state(&mut self) -> Result<Vec<ScalarValue>> { | ||
| // TODO(Matt): There might be a more efficient way to do this by transmuting since calling | ||
| // state() on an Accumulator is considered destructive. | ||
| let state_sv = ScalarValue::Binary(Some(self.state_as_bytes())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to avoid the copy, which may be too ugly , would be to store bloom filter data as an Option<>
So instead of
pub struct SparkBloomFilter {
bits: SparkBitArray,
num_hash_functions: u32,
}Something like
pub struct SparkBloomFilter {
bits: Option<SparkBitArray>
num_hash_functions: u32,
}And then you could basically use Option::take to take the value and leave a None in its place
let Some(bits) = self.bits.take() else {
return Err(invalid state)
};
// do whatever you want now you have the owned `bits`
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #987 +/- ##
============================================
+ Coverage 34.03% 34.41% +0.38%
- Complexity 875 889 +14
============================================
Files 112 112
Lines 43289 43428 +139
Branches 9572 9627 +55
============================================
+ Hits 14734 14947 +213
+ Misses 25521 25437 -84
- Partials 3034 3044 +10
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
|
Results from the benchmark I just added: |
kazuyukitanimura
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still looking
native/core/src/execution/datafusion/util/spark_bloom_filter.rs
Outdated
Show resolved
Hide resolved
native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
Outdated
Show resolved
Hide resolved
|
Just putting notes for the test failure. It's failing one Spark test in The plan is a bit of a monster, but I'll provide it below: This is what it looks like on the main branch: |
kazuyukitanimura
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel we can merge this as soon as we pass all the tests. We can work on optimizations separately if necessary.
|
Debugger output from the failing state in CometArrayImporter. The entire subquery runs in native code now, so my guess is that the output from that projection, which looks like it should be a struct with two binary values in it, is wrong. I'm not sure if it's a bug in the projection, or something further downstream. |
|
I do not have time to look at this error yet. I may take a look after the conference. |
|
Can't say I see a huge different in TPC-H or TPC-DS locally, but the plans I looked at were typically building filters over very small relations. |
|
@mbutrovich Is it possible to trace back where |
|
The Spark SQL test failure can be fixed by #1016. |
|
I merged the fix. You can rebase and re-trigger CI now. |
|
Merged in updated main, thanks for the fix! |
spark/src/test/scala/org/apache/spark/sql/benchmark/CometExecBenchmark.scala
Outdated
Show resolved
Hide resolved
| (0..arr.len()).try_for_each(|index| { | ||
| let v = ScalarValue::try_from_array(arr, index)?; | ||
|
|
||
| if let ScalarValue::Int64(Some(value)) = v { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only supports Int64? Spark BloomFilterAggregate supports Byte, Short, Int, Long and String. If Comet BloomFilterAggregate only support Int64 for now. We need to fallback to Spark for other cases in QueryPlanSerde.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I was going off of their docs which say it only supports Long.
In their implementation, however, if looks like they can cast the fixed width types directly to Long
https://github.com/apache/spark/blob/b078c0d6e2adf7eb0ee7d4742a6c52864440226e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/BloomFilterAggregate.scala#L238
and for strings their bloom filter implementation has a putBinary method that we don't currently support. The casts should be easy. I'll look at what putBinary on our bloom filter implementation will take.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see what happened. 3.4 only supports Long, which was the Spark source I was working off of. 3.5 added support for other types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I modified it to only generate a native BloomFilterAgg if the child has LongType. I'll open an issue to support more types in the future.
| // Does it make sense to do a std::mem::take of filter_state here? Unclear to me if a deep | ||
| // copy of filter_state as a Vec<u64> to a Vec<u8> is happening here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean if std::mem::take also does copy?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter_state isn't needed after this function call, so ideally I'd be able to move its contents out instead of the byte slice, but because the underlying type of filter_state is u64 vector and I'm stuffing it into a u8 vector I think the alignment requirements won't be the same and I won't get the clean buffer transfer that I want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
u64 vector alignment should be acceptable for u8 vector alignment. At least locally I saw their alignments are same.
| if (childExpr.isDefined && | ||
| child.dataType | ||
| .isInstanceOf[LongType] && // Spark 3.4 only supports Long, 3.5+ adds more types. | ||
| numItemsExpr.isDefined && | ||
| numBitsExpr.isDefined && | ||
| dataType.isDefined) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one minor nit (and no need to address for this PR, because this is a general issue that we already have) is that we are testing for a multiple preconditions here, and we fall back if any of them are false (which is correct) but we do not let the user know the specific reason for the fallback, which makes debugging more difficult.
I would eventually like to explore refactoring how we approach this and see if we can add some utilities to make it easier to report fallback reasons, but this is much lower priority than the performance & stability work for now.
|
I tested with TPC-H q5 and see that we are now running the bloom filter agg natively |
andygrove
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @mbutrovich
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> Closes #. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ``` cb3e977 perf: Add experimental feature to replace SortMergeJoin with ShuffledHashJoin (apache#1007) 3df9d5c fix: Make comet-git-info.properties optional (apache#1027) 4033687 chore: Reserve memory for native shuffle writer per partition (apache#1022) bd541d6 (public/main) remove hard-coded version number from Dockerfile (apache#1025) e3ac6cf feat: Implement bloom_filter_agg (apache#987) 8d097d5 (origin/main) chore: Revert "chore: Reserve memory for native shuffle writer per partition (apache#988)" (apache#1020) 591f45a chore: Bump arrow-rs to 53.1.0 and datafusion (apache#1001) e146cfa chore: Reserve memory for native shuffle writer per partition (apache#988) abd9f85 fix: Fallback to Spark if named_struct contains duplicate field names (apache#1016) 22613e9 remove legacy comet-spark-shell (apache#1013) d40c802 clarify that Maven central only has jars for Linux (apache#1009) 837c256 docs: Various documentation improvements (apache#1005) 0667c60 chore: Make parquet reader options Comet options instead of Hadoop options (apache#968) 0028f1e fix: Fallback to Spark if scan has meta columns (apache#997) b131cc3 feat: Support `GetArrayStructFields` expression (apache#993) 3413397 docs: Update tuning guide (apache#995) afd28b9 Quality of life fixes for easier hacking (apache#982) 18150fb chore: Don't transform the HashAggregate to CometHashAggregate if Comet shuffle is disabled (apache#991) a1599e2 chore: Update for 0.3.0 release, prepare for 0.4.0 development (apache#970) ``` ## How are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? -->

Which issue does this PR close?
Closes #846.
Rationale for this change
What changes are included in this PR?
bloom_filter_agg.rs) that uses DataFusion'sAccumulatortrait. We do not have aGroupsAccumulatorimplementation and leave it as a possible future optimization.planner.rs,QueryPlanSerde.scala)spark_bloom_filter.rs,spark_bit_array.rs)How are these changes tested?
CometExecSuitespark_bit_array.rs