-
Notifications
You must be signed in to change notification settings - Fork 52
Fix/handle async lsn component receiver validation #2180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3b7b597
6cb52c0
f65a7e1
465cddb
d8dbace
15d807e
0229180
321496e
1811b42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,9 @@ | ||
| use crate::error::Result; | ||
| use crate::storage::MooncakeTable; | ||
| use crate::storage::SnapshotTableState; | ||
| use crate::Error; | ||
| use crate::ReadState; | ||
| use crate::ReadStateFilepathRemap; | ||
| use more_asserts as ma; | ||
| use std::sync::atomic::{AtomicU64, Ordering}; | ||
| use std::sync::Arc; | ||
| use tokio::sync::{watch, RwLock}; | ||
|
|
@@ -16,6 +16,8 @@ const NO_CACHE_LSN: u64 = u64::MAX; | |
| const NO_SNAPSHOT_LSN: u64 = u64::MAX; | ||
| /// Commit LSN, which indicates there's no commit. | ||
| const NO_COMMIT_LSN: u64 = 0; | ||
| /// Max read snapshot retries | ||
| const MAX_READ_SNAPSHOT_RETRIES: u8 = 5; | ||
|
|
||
| pub struct ReadStateManager { | ||
| last_read_lsn: AtomicU64, | ||
|
|
@@ -103,6 +105,7 @@ impl ReadStateManager { | |
| let mut table_snapshot_rx = self.table_snapshot_watch_receiver.clone(); | ||
| let mut replication_lsn_rx = self.replication_lsn_rx.clone(); | ||
| let last_commit_lsn = self.last_commit_lsn_rx.clone(); | ||
| let mut retries_number: u8 = 0; | ||
|
|
||
| loop { | ||
| let current_snapshot_lsn = *table_snapshot_rx.borrow(); | ||
|
|
@@ -114,6 +117,7 @@ impl ReadStateManager { | |
| current_snapshot_lsn, | ||
| current_replication_lsn, | ||
| last_commit_lsn_val, | ||
| &mut retries_number, | ||
| ) { | ||
| return self | ||
| .read_from_snapshot_and_update_cache( | ||
|
|
@@ -133,18 +137,50 @@ impl ReadStateManager { | |
| } | ||
| } | ||
|
|
||
| #[inline] | ||
| fn validate_lsn_ordering( | ||
| snapshot_lsn: u64, | ||
| commit_lsn: u64, | ||
| replication_lsn: u64, | ||
| ) -> Result<()> { | ||
| // validate right ordering lsn: snapshot_lsn<=commit_lsn<=replication_lsn | ||
| if snapshot_lsn != NO_SNAPSHOT_LSN | ||
| && commit_lsn != NO_COMMIT_LSN | ||
| && snapshot_lsn > commit_lsn | ||
| { | ||
| return Err(Error::read_validation_error(format!( | ||
| "snapshot_lsn > commit_lsn: {} > {}", | ||
| snapshot_lsn, commit_lsn | ||
| ))); | ||
| } | ||
| if commit_lsn > replication_lsn { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the purpose of retry?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The purpose of retries is to synchronize state from the next function
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if it's a good fix or hiding the problem? It reads a synchronization problem to me, retry could also fail.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Asynchronous problems are common in engineering, especially when handling events in systems like Kafka, Spark, and Flink. There are two main ways to address these issues:
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. curious why don't we pick (1)?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's because I see MoonLink handles many background events, for example: Each Message Passing channel can be used in different Events or anywhere. To boost performance and make the code easier to scale, I see your team has broken it into many channels and run them asynchronously
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There're not too many channels: one for replication LSN, another for commit LSN?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me check. What about table_snapshot_watch_receiver? It is also an independent receiver and completely operates asynchronously.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honestly, retries are also a common mechanism to handle such issues. You can configure the retry count to be high or set an eventual timeout. This approach will help your code scale easily in the future. |
||
| return Err(Error::read_validation_error(format!( | ||
| "commit_lsn > replication_lsn: {} > {}", | ||
| commit_lsn, replication_lsn | ||
| ))); | ||
| } | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn can_satisfy_read_from_snapshot( | ||
| &self, | ||
| requested_lsn: Option<u64>, | ||
| snapshot_lsn: u64, | ||
| replication_lsn: u64, | ||
| commit_lsn: u64, | ||
| retries_number: &mut u8, | ||
| ) -> bool { | ||
| // Sanity check on read side: iceberg snapshot LSN <= mooncake snapshot LSN <= commit LSN <= replication LSN | ||
| if snapshot_lsn != NO_SNAPSHOT_LSN && commit_lsn != NO_COMMIT_LSN { | ||
| ma::assert_le!(snapshot_lsn, commit_lsn); | ||
| match Self::validate_lsn_ordering(snapshot_lsn, commit_lsn, replication_lsn) { | ||
| Ok(_) => {} // continue | ||
| Err(err) => { | ||
| *retries_number += 1; | ||
| if *retries_number >= MAX_READ_SNAPSHOT_RETRIES { | ||
| panic!("Error after {} retries: {}", retries_number, err); | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
| ma::assert_le!(commit_lsn, replication_lsn); | ||
|
|
||
| // Check snapshot readability. | ||
| let is_snapshot_clean = Self::snapshot_is_clean(snapshot_lsn, commit_lsn); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Unwrapped
NoneCauses Panic in Snapshot ReadCalling
requested_lsn.unwrap()at line 131 will panic whenrequested_lsnisNone. This occurs whentry_read(None)is called to get the latest snapshot and LSN validation fails incan_satisfy_read_from_snapshot. The validation failure causes the function to returnfalsewithout satisfying the read, leading to the call towait_for_relevant_lsn_changewith an unwrappedNonevalue. The code should handle theNonecase explicitly, either by usingunwrap_orwith a sensible default or by restructuring the logic to avoid waiting when no specific LSN is requested.