-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-6226] Support parquet native bloom filters #8716
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@danny0405 could you please review this ? unfortunatly my IDE still does not work with scala, sorry for the poor formating on the tests. |
| FSUtils.registerFileSystem(file, parquetConfig.getHadoopConf())); | ||
| ParquetWriter.Builder parquetWriterbuilder = new ParquetWriter.Builder(HoodieWrapperFileSystem.convertToHoodiePath(file, parquetConfig.getHadoopConf())) { | ||
| @Override | ||
| protected ParquetWriter.Builder self() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you show me the code how spark adapter to the Parquet SSBF support?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry not fully understood what you want. What is Parquet SSBF? Do you mean how spark itself handle parquet blooms ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, can you show me that code snippet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please see my notes for the lineage on spark: it basically use an other parquet class to write, which get the hadoop conf directly. (ie ParquetOutputFormat) while hudi uses ParquetWriter directly
- ParquetOutputWriter in spark
asks for parquet:ParquetOutputFormat which get the bloom configs - ParquetUtils in spark
has PrepareWrite function, which propagate to ParquetOurputWriter - ParquetWrite in spark
has prepareWrite function, which propagate to ParquetUtils.prepareWrite - ParquetTable in spark
uses ParquetWrite - ParquetDatasourceV2 in spark
uses ParquetTable in getTable (then for read and write)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, another question is does Delta can gain benefits from these bloomfilters, how much regression for writer path when enabling the BloomFilter for parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Delta" do you mean MOR logs files configured in parquet format ? If you mean delta-lake then yes, as well as iceberg they likely rely on spark writer so benefit from bloom.
By regression do you mean performance regression ? Each column configured with bloom will introduce overhead at write time, but faster subsequent reads with predicates on the column. I haven't benchmarked that but I can say blooms will faster reads significantly by skipping lot of parquet scan that hudi stats index won't cover. ie uuids, strings, high cardinality dictionaries
BTW in this PR up to the user to enable blooms so no regression is expected
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieParquetBloom.scala
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieBaseParquetWriter.java
Outdated
Show resolved
Hide resolved
|
OK understood. Thx
…On May 24, 2023 6:23:42 AM UTC, Danny Chan ***@***.***> wrote:
@danny0405 commented on this pull request.
> @@ -67,6 +89,22 @@ public HoodieBaseParquetWriter(Path file,
this.recordCountForNextSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
}
+ protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuilder, Configuration hadoopConf) {
+ parquetWriterbuilder.withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(hadoopConf));
+ // inspired from https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L458-L464
+ hadoopConf.forEach(conf -> {
+ String key = conf.getKey();
+ if (key.startsWith(BLOOM_FILTER_ENABLED)) {
+ String column = key.substring(BLOOM_FILTER_ENABLED.length() + 1, key.length());
+ parquetWriterbuilder.withBloomFilterEnabled(column, Boolean.valueOf(conf.getValue()));
The writer should be shared by all the engines, not just Spark, so it is necessary to add a basic UT for it.
--
Reply to this email directly or view it on GitHub:
#8716 (comment)
You are receiving this because you authored the thread.
Message ID: ***@***.***>
|
|
For reference the delta bloom documentation https://docs.databricks.com/optimizations/bloom-filters.html
…On May 24, 2023 6:23:42 AM UTC, Danny Chan ***@***.***> wrote:
@danny0405 commented on this pull request.
> @@ -67,6 +89,22 @@ public HoodieBaseParquetWriter(Path file,
this.recordCountForNextSizeCheck = DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK;
}
+ protected void handleParquetBloomFilters(ParquetWriter.Builder parquetWriterbuilder, Configuration hadoopConf) {
+ parquetWriterbuilder.withBloomFilterEnabled(ParquetOutputFormat.getBloomFilterEnabled(hadoopConf));
+ // inspired from https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L458-L464
+ hadoopConf.forEach(conf -> {
+ String key = conf.getKey();
+ if (key.startsWith(BLOOM_FILTER_ENABLED)) {
+ String column = key.substring(BLOOM_FILTER_ENABLED.length() + 1, key.length());
+ parquetWriterbuilder.withBloomFilterEnabled(column, Boolean.valueOf(conf.getValue()));
The writer should be shared by all the engines, not just Spark, so it is necessary to add a basic UT for it.
--
Reply to this email directly or view it on GitHub:
#8716 (comment)
You are receiving this because you authored the thread.
Message ID: ***@***.***>
|
|
Thanks for the sharing, I think the Databricks BloomFilter index mainly serves as query optimization purposes right? Do they also use this to accelarate the data skipping during data ingestion, aka the UPSERTS ? |
|
No AFAIK both iceberg and delta use bloom in the same way the current PR does: read time parquet push down.
Just to mention spark also provides a [join based on bloom](apache/spark#35789) since 3.3 but in this cas it is a on the fly built bloom, and not the parquet bloom filter.
…On May 26, 2023 5:19:25 AM UTC, Danny Chan ***@***.***> wrote:
Thanks for the sharing, I think the Databricks BloomFilter index mainly serves as query optimization purposes right? Do they also use this to accelate the data skipping during data ingestion, aka the UPSERTS ?
--
Reply to this email directly or view it on GitHub:
#8716 (comment)
You are receiving this because you authored the thread.
Message ID: ***@***.***>
|
|
@parisni Thanks for the clarification, can you supplement the tests and rebase with the latest master code, then re-trigger the tests again? |
|
@danny0405 done for UT + rebase, let me know |
Still see many compilure failures, can you take a look again ~ |
@danny0405 This is a non trivial compilation problem. The bloom stuff only works with parquet >= 1.12. Then any engine shipping a lower version won't compile. So far I have been working with spark3.2 which handle bloom and did not encounter this. Example:
|
85df05e to
0c2ed7d
Compare
I checked that spark 3.0 and spark 2.4 still use parquet 1.10.1, spark3.0+ release use parquet 1.12.2, Flink uses parquet 1.12.2, let's try the first approach first. |
Weird since all flink based build were failing the same way spark 2.4 |
|
@danny0405 @yihua can you please guide me on how to implement that parquet writer for each engine ? I am confortable with the reflexion way which is fairly simple, but dealing with every engine is non trivial since currently the parquet builder is a common class |
One way is to make the parquet writer abstract and add a interface like |
i have no idea how to inject the proper impls in each client, since so far each client do use the common parquet writer. This looks a huge refactoring. |
|
We can restart the task when Hudi has dropped spark 2.4, after which the parquet version can be upgraded. |
|
What's the timeline for dropping 2.4 ?
Also we could use the reflexion way and drop it when parquet get updated.
There is several advantages for this:
1. the feature is important to fill the gap soon with other acid tables
2. we remove here the use of deprecated parquet constructors
3. the patch is tiny
…On June 21, 2023 12:58:14 AM UTC, Danny Chan ***@***.***> wrote:
We can restart the task when Hudi has dropped spark 2.4, after which the parquet version can be upgraded.
--
Reply to this email directly or view it on GitHub:
#8716 (comment)
You are receiving this because you were mentioned.
Message ID: ***@***.***>
|
Maybe 1.0 release, the only reason we have very old parquet version is because the Spark 2.4. |
|
ok let me fix the build w/ reflexion and then decide |
|
@danny0405 build and tests are passing now. I propose to bring this to 0.14 and when spark 2.4 will be dropped, clean this up. Would be glad to write the documentation in how to setup parquet bloom (same way as parquet modular encryption) |
|
I saw the unit test for the new writer was dropped, was it because of the compaibility issue? |
Yes. In order to check the bloom existence we relied on parquet tools newer version. It breaks the build. By the way the other test already makes sure of bloom existence. I might be able to restore the test now and trick it again with reflexion. Or I could restore it when spark 2.4 support gets dropped |
|
hey @parisni : good job on the patch. Curious to know if you have any perf nos on this. on both write and read side. whats the perf overhead we are seeing on the write side and how much improvement we are seeing on the read side w/ the bloom filter. Also, would you provide a short write up(whats this support is all about, how users can leverage this and whats the benefit) on this that we can use it in our release page? |
|
@nsivabalan
There is existing spark benchmarks here. Basically 20% slower for writes and up to 4x for reads. https://github.com/apache/spark/blob/18d0a276c501a102af3e7ed251831983b9148a4f/sql/core/benchmarks/BloomFilterBenchmark-jdk11-results.txt
As for documentation plz consider this pr #9056
…On July 14, 2023 6:02:18 PM UTC, Sivabalan Narayanan ***@***.***> wrote:
hey @parisni : good job on the patch. Curious to know if you have any perf nos on this. on both write and read side. whats the perf overhead we are seeing on the write side and how much improvement we are seeing on the read side w/ the bloom filter.
Also, would you provide a short write up(whats this support is all about, how users can leverage this and whats the benefit) on this that we can use it in our release page?
--
Reply to this email directly or view it on GitHub:
#8716 (comment)
You are receiving this because you were mentioned.
Message ID: ***@***.***>
|
Change Logs
Provides support for parquet native bloom filters to improve read performances. fixes #7117
Impact
All hudi operations (bulk_insert, insert, upsert ...) will be able to write parquet bloom filters if configured within the application hadoop conf. The hudi reader already leverages the blooms if the predicates match the columns configured at write time.
Bloom indexes only works for COW tables since it needs parquet files and is unlikely to work with MOR logs files even configured to write parquet format instead of avro.
Risk level (write none, low medium or high below)
If medium or high, explain what verification was done to mitigate the risks.
Documentation Update
The doc will be added a new guide, in the same maner of current encryption page.
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist