-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[Spanner Change Streams] Parse proto bytes change stream record in ChangeStreamRecordMapper #37427
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @chenxuesdu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Spanner Change Streams connector to support parsing change stream records that are delivered as proto bytes, particularly in PostgreSQL environments. By introducing specific methods to identify and deserialize these proto bytes, the system can now uniformly process change stream data, whether it originates as JSONB or proto, leveraging existing proto-based mapping logic. This change improves compatibility and robustness for different Spanner database dialects and output formats. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
| public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecordFromBytes() { | ||
| recordReadAt = Timestamp.now(); | ||
| // Use getBytes(0) for the BYTES column returned by read_proto_bytes_ TVF | ||
| byte[] protoBytes = resultSet.getBytes(0).toByteArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pass this directly to the function below to avoid an extra copy of the bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Thanks.
| && resultSet.getColumnType(0).getCode() == com.google.cloud.spanner.Type.Code.PROTO; | ||
| } | ||
|
|
||
| public com.google.spanner.v1.ChangeStreamRecord getProtoChangeStreamRecordFromBytes() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add accurate comments for this function, following similar convention for other functions? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments. Please take a look.
| try { | ||
| return com.google.spanner.v1.ChangeStreamRecord.parseFrom(protoBytes); | ||
| } catch (InvalidProtocolBufferException e) { | ||
| throw new RuntimeException("Failed to parse ChangeStreamRecord proto", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe "Failed to parse the proto bytes to ChangeStreamRecord proto".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated. Thanks.
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
a3858ab to
5e5c9aa
Compare
|
Run Java_GCP_IO_Direct PreCommit |
1 similar comment
|
Run Java_GCP_IO_Direct PreCommit |
|
Run Java_GCP_IO_Direct PreCommit |
|
Assigning reviewers: R: @Abacn for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| * Returns the only change stream record proto at the current pointer of the result set. It also | ||
| * updates the timestamp at which the record was read. This function enhances the getProtoMessage | ||
| * function but only focus on the ChangeStreamRecord type. | ||
| * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a similar comment here for GoogleSQl databases like you did for line 136? Thanks.
| if (this.isPostgres()) { | ||
| // In PostgresQL, change stream records are returned as JsonB. | ||
| // For `MUTABLE_KEY_RANGE` option, change stream records are returned as protos. | ||
| if (resultSet.isProtoBytesChangeRecord()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we have the getBytes(0) reflected here to match up getPgJsonb(0) so that it is very clear
that we should only expect one column from the result set.
nielm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - pending existing comments
When parsing the change stream record in postgresql, if the output is proto bytes, map it to proto type, so we can reuse the existing logic to parse the change stream record.