-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[Parquet] Add ParquetMetadataPushDecoder #8080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ff4a158 to
8ae1311
Compare
parquet/src/errors.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the one part of this PR I am not sure about
Since the ParquetError is (now) marked as #[non_exhaustive] I don't think this is technically a breaking change. However, it would be really nice to avoid a new enum -- I will comment about this later in the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I had this error in my original draft of ParquetMetaDataReader (3c340b7) but got talked out of it 😅 (#6392 (comment))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I should have known you were right and gone with your instinct!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I could remove ParquetError::NeedMoreDataRange if I reworked the decoder to have a proper state machine internally (rather than calling into the existing ParquetMetaDataDecoder).
That is certainly my long term plan. If we like this API I will invest some time into seeing if I can sketch out what it would look like
parquet/src/lib.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a new public API for returning results or requesting more data. It is also used in the parquet push decoder -- you can see it all wired up here: #7997
parquet/src/util/push_buffers.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the in memory buffer that holds the data needed for decoding. It is also used by the parquet push decoder in #7997
parquet/src/util/push_buffers.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This std::io::Read impl is needed so the current thrift decoder can read bytes
I suspect when @etseidl is done with his custom thrift decoder we can remove this impl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite pleased with this API and I think the examples show the main IO patterns that we would want:
- Feed exactly the byte ranges needed
- "prefetch" a bunch of the data to avoid multiple IOs
- Read via the standard library
AsyncReadtrait, which has been asked for several times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am probably too exuberant, but I am really pleased with this API and how it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internally the current implementation just calls out to the existing ParquetMetaDataReader
However, I think long term it would make more sense to reverse the logic, and have the decoding machinery to live in this decoder, refactor ParquetMetaDataReader to use the push decoder.
Given this PR is already quite large, I figured this would be a good follow on to do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wholeheartedly endorse this idea. 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also quite pleased with these tests as I think it makes clear what "IO" is happening compared to different operations
8ae1311 to
dfd6435
Compare
|
@etseidl I would especially be interested in your feedback on this API / idea, given your plans for the parquet metadata decoding revamp |
|
Thanks @alamb, this seems exciting. I want to try playing with it in one of my branches to see if there are any ergonomic issues. |
|
Ok, I'm starting to grok this. I merged this branch into my current thrift branch, and changed pub fn try_decode(
&mut self,
) -> std::result::Result<DecodeResult<ParquetMetaData>, ParquetError> {
if self.done {
return Ok(DecodeResult::Finished);
}
// need to have the last 8 bytes of the file to decode the metadata
let file_len = self.buffers.file_len();
if !self.buffers.has_range(&(file_len - 8..file_len)) {
#[expect(clippy::single_range_in_vec_init)]
return Ok(DecodeResult::NeedsData(vec![file_len - 8..file_len]));
}
// Try to parse the metadata from the buffers we have.
// If we don't have enough data, it will return a `ParquetError::NeedMoreData`
// with the number of bytes needed to complete the metadata parsing.
// If we have enough data, it will return `Ok(())` and we can
let footer_bytes = self
.buffers
.get_bytes(file_len - FOOTER_SIZE as u64, FOOTER_SIZE)?;
let mut footer = [0_u8; FOOTER_SIZE];
footer_bytes.as_ref().copy_to_slice(&mut footer);
let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
let metadata_len = footer.metadata_length();
let footer_metadata_len = FOOTER_SIZE + metadata_len;
let footer_start = file_len - footer_metadata_len as u64;
let footer_end = file_len - FOOTER_SIZE as u64;
if !self.buffers.has_range(&(footer_start..footer_end)) {
#[expect(clippy::single_range_in_vec_init)]
return Ok(DecodeResult::NeedsData(vec![footer_start..file_len]));
}
let metadata_bytes = self.buffers.get_bytes(footer_start, metadata_len)?;
let metadata = ParquetMetaDataReader::decode_file_metadata(&metadata_bytes)?;
self.done = true;
Ok(DecodeResult::Data(metadata))
}No page indexes yet, but this seems pretty nice 👍 Once I have the page indexes converted the parser should get pretty simple. |
I am also thinking of going one step further and rewriting the entire metadata decoder as an explicit state machine, roughly in the pattern you describe, which would avoid having to use |
…aData` (#8111) # Which issue does this PR close? **Note: this targets a feature branch, not main** - Part of #5854. # Rationale for this change # What changes are included in this PR? This PR completes reading of the `FileMetaData` and `RowGroupMetaData` pieces of the `ParquetMetaData`. Column indexes and encryption will be follow-on work. This replaces the macro for generating structs with a more general one that can take visibility and lifetime specifiers. Also (temporarily) adds a new function `ParquetMetaDataReader::decode_file_metadata` which should be a drop-in replacement for `ParquetMetaDataReader::decode_metadata`. Still todo: 1. Add some tests that verify this produces the same output as `ParquetMetaDataReader::decode_metadata` 2. Read column indexes with new decoder 3. Read page headers with new decoder 4. Integrate with @alamb's push decoder work #8080 # Are these changes tested? Not yet # Are there any user-facing changes? Yes
|
I believe @mbutrovich said he was planning to review this PR |
etseidl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some nits I found when reading the rendered documentation. Looking good so far 👍
| /// 1. Zero copy | ||
| /// 2. non contiguous ranges of bytes | ||
| /// | ||
| /// # Non Coalescing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
Co-authored-by: Ed Seidl <[email protected]>
mbutrovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nits, this is looking great @alamb!
mbutrovich
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick revision, @alamb. This is looking great!
|
Thanks for the reviews @mbutrovich and @etseidl -- I plan to merge this PR sometime this week, and then I'll try and make a POC to rework the current metadata decoder to use this one instead of the other way around |
| if found { | ||
| // If we found the range, we can return the number of bytes read | ||
| // advance our offset | ||
| self.offset += buf.len() as u64; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we be able to avoid re-iterating ranges coming before the virtual offset on subsequent calls to read if we kept the index of the range/buffers Vecs as part of the offset?
I guess this would only be impactful if there were many ranges, so feel free to ignore this suggestion if the complexity outweighs the benefit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is an interesting idea -- I am not sure the current API requires any particular order of the ranges (so I don't think it is guaranteed that the bytes for the next range are next after the current found index.
However, the linear scan is likely to be inefficient if we are storing many ranges -- maybe we should keep them in some sort of sorted structure (like a BTreeSet, for example) 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I didn't realize the ranges weren't in a sorted order. I think BTreeSet could work. We could probably start the iter from the last range we saw by calling ranges.range((Included(&last_range), Unbounded)).
That said, I don't want to hold up this PR. I'm fine with the current solution for now & we could always optimize the linear scan down the line if we find we're storing a lot of ranges.
|
I was out last week so I am delayed |
Co-authored-by: albertlockett <[email protected]>
…rs into alamb/push_metadata_decoder
|
I'll plan to merge this PR tomorrow unless anyone else would like time to comment |
|
Thanks everyone. I am hoping to try and prototype moving hte decoder logic too shortly |
|
FWIW I am starting to refactor the actual decoding state machine here: |
|
The PR to move the actual decoding into its own state machine is now ready for review: |
…coder (#8340) # Which issue does this PR close? - part of #8000 - Follow on to #8080 - Closes #8439 # Rationale for this change The current ParquetMetadataDecoder intermixes three things: 1. The state machine for decoding parquet metadata (footer, then metadata, then (optional) indexes) 2. orchestrating IO (aka calling read, etc) 3. Decoding thrift encoded byte into objects This makes it almost impossible to add features like "only decode a subset of the columns in the ColumnIndex" and other potentially advanced usecases Now that we have a "push" style API for metadata decoding that avoids IO, the next step is to extract out the actual work into this API so that the existing ParquetMetadataDecoder just calls into the PushDecoder # What changes are included in this PR? 1. Extract decoding state machine into PushMetadataDecoder 2. Extract thrift parsing into its own `parser` module 3. Update ParquetMetadataDecoder to use the PushMetadataDecoder 4. Extract the bytes --> object code into its own module This almost certainly will conflict with @etseidl 's plans in thrift-remodel. # Are these changes tested? by existing tests # Are there any user-facing changes? Not really -- this is an internal change that will make it easier to add features like "only decode a subset of the columns in the ColumnIndex, for example
# Which issue does this PR close? - Part of #8000 - closes #7983 # Rationale for this change This PR is the first part of separating IO and decode operations in the rust parquet decoder. Decoupling IO and CPU enables several important usecases: 1. Different IO patterns (e.g. not buffer the entire row group at once) 2. Different IO APIs e.g. use io_uring, or OpenDAL, etc. 3. Deliberate prefetching within a file 4. Avoid code duplication between the `ParquetRecordBatchStreamBuilder` and `ParquetRecordBatchReaderBuilder` # What changes are included in this PR? 1. Add new `ParquetDecoderBuilder`, and `ParquetDecoder` and tests It is effectively an explicit version of the state machine that is used in existing async reader (where the state machine is encoded as Rust `async` / `await` structures) # Are these changes tested? Yes -- there are extensive tests for the new code Note that this PR actually adds a **3rd** path for control flow (when I claim this will remove duplication!) In follow on PRs I will convert the existing readers to use this new pattern, similarly to the sequence I did for the metadata decoder: - #8080 - #8340 Here is a preview of a PR that consolidates the async reader to use the push decoder internally (and removes one duplicate): - #8159 - closes #8022 # Are there any user-facing changes? Yes, a new API, but now changes to the existing APIs --------- Co-authored-by: Matthijs Brobbel <[email protected]> Co-authored-by: Adrian Garcia Badaracco <[email protected]>
Which issue does this PR close?
Rationale for this change
Metadata is needed when implementing a push decoder for Parquet:
If we want to truly separate IO and CPU we also need a way to decode the metadata without explicit IO, and hence this PR that provides a way to decode metadata "push style" where it tells you what bytes are needed. It follows the same API as the parquet push decoder
This PR also introduces some of the common infrastructure needed in the parquet push decoder
What changes are included in this PR?
PushBuffersto hold byte rangesDecodeResultto communicate back to the callerParquetMetaDataPushDecoderfor decoding metadataAre these changes tested?
Yes, there are several fully working doc tests that show how to use this API
Are there any user-facing changes?
There is a new API