Skip to content

Conversation

@thuongle2210
Copy link

@thuongle2210 thuongle2210 commented Oct 26, 2025

Summary

The PR resolve the asynchronous problems when event handling by using retry mechanism. The purpose of retries is to synchronize state from the next function wait_for_relevant_lsn_change, because the events are received from independent channels, which leads to asynchronous problems

Related Issues

links to related issues: #2181

Changes

  • Update mechanism to validate lsn ordering when reading state in ReadingStateManager
  • Update Error type in moonlink package

Checklist

  • Code builds correctly
  • Tests have been added or updated
  • Documentation updated if necessary
  • I have reviewed my own changes

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the final PR Bugbot will review for you during this billing cycle

Your free Bugbot reviews will reset on November 9

Details

Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.

To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.

}
}

#[inline]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Unwrapped None Causes Panic in Snapshot Read

Calling requested_lsn.unwrap() at line 131 will panic when requested_lsn is None. This occurs when try_read(None) is called to get the latest snapshot and LSN validation fails in can_satisfy_read_from_snapshot. The validation failure causes the function to return false without satisfying the read, leading to the call to wait_for_relevant_lsn_change with an unwrapped None value. The code should handle the None case explicitly, either by using unwrap_or with a sensible default or by restructuring the logic to avoid waiting when no specific LSN is requested.

Fix in Cursor Fix in Web

snapshot_lsn, commit_lsn
)));
}
if commit_lsn > replication_lsn {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of retry?

Copy link
Author

@thuongle2210 thuongle2210 Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of retries is to synchronize state from the next function wait_for_relevant_lsn_change, because the events are received from independent channels, which leads to asynchronous problems. You can reproduce the issue in my test case by setting const MAX_READ_SNAPSHOT_RETRIES: u8 = 0 to observe the problems.
In my case, if MAX_READ_SNAPSHOT_RETRIES: u8 = 0, I see 36 cases which exist the problem: replication_lsn<= commit_lsn. Causing a failure situation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's a good fix or hiding the problem? It reads a synchronization problem to me, retry could also fail.

Copy link
Author

@thuongle2210 thuongle2210 Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asynchronous problems are common in engineering, especially when handling events in systems like Kafka, Spark, and Flink. There are two main ways to address these issues:
First, combine all related events into sequential packages and use a single channel (similar to one partition in Kafka).
Second, add a retry mechanism with a timeout or retry limit to handle asynchronous events (such as watermarking).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why don't we pick (1)?

Copy link
Author

@thuongle2210 thuongle2210 Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's because I see MoonLink handles many background events, for example:
CdcEvent::PrimaryKeepAlive
CdcEvent::StreamCommit
CdcEvent::Commit
... etc.

Each Message Passing channel can be used in different Events or anywhere. To boost performance and make the code easier to scale, I see your team has broken it into many channels and run them asynchronously

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're not too many channels: one for replication LSN, another for commit LSN?
Seems like related to

Copy link
Author

@thuongle2210 thuongle2210 Oct 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check. What about table_snapshot_watch_receiver? It is also an independent receiver and completely operates asynchronously.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, retries are also a common mechanism to handle such issues. You can configure the retry count to be high or set an eventual timeout. This approach will help your code scale easily in the future.

@github-actions
Copy link

This PR has been inactive for 14 days and is now marked as stale. If this is still being worked on, please comment to keep it open.

@github-actions github-actions bot added the stale label Nov 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants