Skip to content
12 changes: 10 additions & 2 deletions src/moonlink/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ pub enum Error {

#[error("{0}")]
OtelExporterBuildError(ErrorStruct),

#[error("{0}")]
ReadStateManager(ErrorStruct),
}

pub type Result<T> = result::Result<T, Error>;
Expand All @@ -59,6 +62,10 @@ impl Error {
pub fn delta_generic_error(message: String) -> Self {
Self::DeltaLakeError(ErrorStruct::new(message, ErrorStatus::Permanent))
}
#[track_caller]
pub fn read_validation_error(message: String) -> Self {
Self::ReadStateManager(ErrorStruct::new(message, ErrorStatus::Permanent))
}
}

impl From<OtelExporterBuildError> for Error {
Expand Down Expand Up @@ -216,7 +223,8 @@ impl Error {
| Error::JoinError(err)
| Error::PbToMoonlinkRowError(err)
| Error::OtelExporterBuildError(err)
| Error::Json(err) => err.status,
| Error::Json(err)
| Error::ReadStateManager(err) => err.status,
}
}
}
Expand Down Expand Up @@ -246,7 +254,7 @@ mod tests {
if let Error::Io(ref inner) = io_error {
let loc = inner.location.as_ref().unwrap();
assert!(loc.contains("src/moonlink/src/error.rs"));
assert!(loc.contains("230"));
assert!(loc.contains("238"));
assert!(loc.contains("9"));
}
}
Expand Down
44 changes: 40 additions & 4 deletions src/moonlink/src/union_read/read_state_manager.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::error::Result;
use crate::storage::MooncakeTable;
use crate::storage::SnapshotTableState;
use crate::Error;
use crate::ReadState;
use crate::ReadStateFilepathRemap;
use more_asserts as ma;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{watch, RwLock};
Expand All @@ -16,6 +16,8 @@ const NO_CACHE_LSN: u64 = u64::MAX;
const NO_SNAPSHOT_LSN: u64 = u64::MAX;
/// Commit LSN, which indicates there's no commit.
const NO_COMMIT_LSN: u64 = 0;
/// Max read snapshot retries
const MAX_READ_SNAPSHOT_RETRIES: u8 = 5;

pub struct ReadStateManager {
last_read_lsn: AtomicU64,
Expand Down Expand Up @@ -103,6 +105,7 @@ impl ReadStateManager {
let mut table_snapshot_rx = self.table_snapshot_watch_receiver.clone();
let mut replication_lsn_rx = self.replication_lsn_rx.clone();
let last_commit_lsn = self.last_commit_lsn_rx.clone();
let mut retries_number: u8 = 0;

loop {
let current_snapshot_lsn = *table_snapshot_rx.borrow();
Expand All @@ -114,6 +117,7 @@ impl ReadStateManager {
current_snapshot_lsn,
current_replication_lsn,
last_commit_lsn_val,
&mut retries_number,
) {
return self
.read_from_snapshot_and_update_cache(
Expand All @@ -133,18 +137,50 @@ impl ReadStateManager {
}
}

#[inline]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Unwrapped None Causes Panic in Snapshot Read

Calling requested_lsn.unwrap() at line 131 will panic when requested_lsn is None. This occurs when try_read(None) is called to get the latest snapshot and LSN validation fails in can_satisfy_read_from_snapshot. The validation failure causes the function to return false without satisfying the read, leading to the call to wait_for_relevant_lsn_change with an unwrapped None value. The code should handle the None case explicitly, either by using unwrap_or with a sensible default or by restructuring the logic to avoid waiting when no specific LSN is requested.

Fix in Cursor Fix in Web

fn validate_lsn_ordering(
snapshot_lsn: u64,
commit_lsn: u64,
replication_lsn: u64,
) -> Result<()> {
// validate right ordering lsn: snapshot_lsn<=commit_lsn<=replication_lsn
if snapshot_lsn != NO_SNAPSHOT_LSN
&& commit_lsn != NO_COMMIT_LSN
&& snapshot_lsn > commit_lsn
{
return Err(Error::read_validation_error(format!(
"snapshot_lsn > commit_lsn: {} > {}",
snapshot_lsn, commit_lsn
)));
}
if commit_lsn > replication_lsn {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of retry?

Copy link
Author

@thuongle2210 thuongle2210 Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of retries is to synchronize state from the next function wait_for_relevant_lsn_change, because the events are received from independent channels, which leads to asynchronous problems. You can reproduce the issue in my test case by setting const MAX_READ_SNAPSHOT_RETRIES: u8 = 0 to observe the problems.
In my case, if MAX_READ_SNAPSHOT_RETRIES: u8 = 0, I see 36 cases which exist the problem: replication_lsn<= commit_lsn. Causing a failure situation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's a good fix or hiding the problem? It reads a synchronization problem to me, retry could also fail.

Copy link
Author

@thuongle2210 thuongle2210 Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asynchronous problems are common in engineering, especially when handling events in systems like Kafka, Spark, and Flink. There are two main ways to address these issues:
First, combine all related events into sequential packages and use a single channel (similar to one partition in Kafka).
Second, add a retry mechanism with a timeout or retry limit to handle asynchronous events (such as watermarking).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why don't we pick (1)?

Copy link
Author

@thuongle2210 thuongle2210 Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because I see MoonLink handles many background events, for example:
CdcEvent::PrimaryKeepAlive
CdcEvent::StreamCommit
CdcEvent::Commit
... etc.

Each Message Passing channel can be used in different Events or anywhere. To boost performance and make the code easier to scale, I see your team has broken it into many channels and run them asynchronously

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're not too many channels: one for replication LSN, another for commit LSN?
Seems like related to

Copy link
Author

@thuongle2210 thuongle2210 Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check. What about table_snapshot_watch_receiver? It is also an independent receiver and completely operates asynchronously.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, retries are also a common mechanism to handle such issues. You can configure the retry count to be high or set an eventual timeout. This approach will help your code scale easily in the future.

return Err(Error::read_validation_error(format!(
"commit_lsn > replication_lsn: {} > {}",
commit_lsn, replication_lsn
)));
}
Ok(())
}

fn can_satisfy_read_from_snapshot(
&self,
requested_lsn: Option<u64>,
snapshot_lsn: u64,
replication_lsn: u64,
commit_lsn: u64,
retries_number: &mut u8,
) -> bool {
// Sanity check on read side: iceberg snapshot LSN <= mooncake snapshot LSN <= commit LSN <= replication LSN
if snapshot_lsn != NO_SNAPSHOT_LSN && commit_lsn != NO_COMMIT_LSN {
ma::assert_le!(snapshot_lsn, commit_lsn);
match Self::validate_lsn_ordering(snapshot_lsn, commit_lsn, replication_lsn) {
Ok(_) => {} // continue
Err(err) => {
*retries_number += 1;
if *retries_number >= MAX_READ_SNAPSHOT_RETRIES {
panic!("Error after {} retries: {}", retries_number, err);
}
return false;
}
}
ma::assert_le!(commit_lsn, replication_lsn);

// Check snapshot readability.
let is_snapshot_clean = Self::snapshot_is_clean(snapshot_lsn, commit_lsn);
Expand Down