-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Deduplicate and standardize deserialization logic for streams #13412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
We discussed this with @alihan-synnada and it looks good to me, but it'd be great to get community review. cc @alamb |
ozankabak
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's wait a little bit for more eyes on this, but I carefully went through and it seems like a good first step towards removing code duplication on the read side.
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @alihan-synnada and @ozankabak
I think this PR is really well documented and makes a lot of sene to me
One thing I noticed is that the ticket talks about the arrow and avro as well. Do you plan to update them in a follow on PR?
Finally, the ticket also mentions parquet -- I think it will be hard to update the parquet reader (or any columnar file format) to use the DecodeTrait. The parquet reader itself drives what IO to do (aka what byte ranges and when) rather than the more row oriented format.
|
|
||
| /// Possible outputs of a [`BatchDeserializer`]. | ||
| #[derive(Debug, PartialEq)] | ||
| pub enum DeserializerOutput { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 nice
| pub(crate) trait Decoder: Send + Debug { | ||
| /// See [`arrow::json::reader::Decoder::decode`]. | ||
| /// | ||
| /// [`arrow::json::reader::Decoder::decode`]: ::arrow::json::reader::Decoder::decode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I double checked and https://docs.rs/arrow-json/53.2.0/arrow_json/reader/struct.Decoder.html seems to describe this interface well.
| /// | ||
| /// This struct is responsible for converting a stream of bytes, which represent | ||
| /// encoded data, into a stream of `RecordBatch` objects, following the specified | ||
| /// schema and formatting options. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be worth also mentioning here this handles any buffering on the input that might be required to fulfill the decode interface (that might return RecordBatches before fully consuming the input)
It took me a while to figure out why this was required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - thank you for pointing it out
| } | ||
| GetResultPayload::Stream(s) => { | ||
| let mut decoder = config.builder().build_decoder(); | ||
| let decoder = config.builder().build_decoder(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is certainly a lot nicer 😍
Yes, indeed. Not an immediate priority but we would like to tidy up the read side.
I agree -- Parquet will probably stay separate for the time being. |
…apache#13412)" This reverts commit 06db9ed.
Which issue does this PR close?
None
Rationale for this change
Part of #13411
This PR implements a common
Decodertrait, theBatchDeserializertrait and theDecoderDeserializerstruct as described in the issue, along withCsvDecoderandJsonDecoderasarrow-csvandarrow-jsonDecoders are readily available.What changes are included in this PR?
Note: There are about 290 lines of new tests, so it is about 250 lines of actual code.
BatchDeserializeras a common interface.digestconsumes the input in chunksnextattempts to deserialize the digested data and returns aDeserializerOutputwhich is either aRecordBatch,RequiresMoreDataandInputExhaustedfinishsignals the end of the input streamDecodertraitDecodersDecoderforCsvDecoderandJsonDecoderby forwarding methodsDecoderDeserializerand implementBatchDeserializerfor formats that have aDecoderimplementation.deserialize_streamfunction to deduplicate the deserialization logicAre these changes tested?
Yes, the changes are covered by new tests added to the CSV and JSON modules.
Are there any user-facing changes?
No