From 3b7b5973d38a85246f6fb36d54d7e1dc74f4ad79 Mon Sep 17 00:00:00 2001 From: thuong Date: Sat, 11 Oct 2025 00:04:10 +0700 Subject: [PATCH 1/5] release: bump toolchain chanel version --- rust-toolchain.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 73869c5fb..2ed7e3112 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,4 +1,4 @@ # rust-toolchain.toml [toolchain] -channel = "1.88.0" +channel = "1.90.0" components = ["clippy", "rustfmt"] From d8dbace393ffedf2986d495b54e7b05966c585ba Mon Sep 17 00:00:00 2001 From: thuong Date: Sun, 26 Oct 2025 05:23:09 +0000 Subject: [PATCH 2/5] sync from upstream repo --- Cargo.lock | 123 ++++++++++++------ Cargo.toml | 18 +-- deny.toml | 19 +++ src/moonlink/src/lib.rs | 4 +- .../src/lsn_state.rs} | 15 ++- src/moonlink/src/storage.rs | 2 +- .../src/storage/cache/metadata/moka_cache.rs | 1 + .../src/storage/filesystem/storage_config.rs | 1 + src/moonlink/src/storage/mooncake_table.rs | 4 +- .../storage/mooncake_table/batch_ingestion.rs | 10 +- .../storage/mooncake_table/data_batches.rs | 8 +- src/moonlink/src/storage/table/iceberg.rs | 8 +- .../storage/table/iceberg/catalog_utils.rs | 6 +- .../src/storage/table/iceberg/glue_catalog.rs | 10 +- .../table/iceberg/iceberg_table_config.rs | 12 +- .../src/storage/table/iceberg/rest_catalog.rs | 10 +- src/moonlink/src/table_handler/chaos_test.rs | 2 +- src/moonlink_connectors/src/lib.rs | 1 - src/moonlink_connectors/src/pg_replicate.rs | 23 ++-- .../src/pg_replicate/moonlink_sink.rs | 74 +++++++---- .../src/pg_replicate/table_init.rs | 15 ++- .../src/replication_connection.rs | 2 +- src/moonlink_connectors/src/rest_ingest.rs | 7 +- .../src/rest_ingest/decimal_utils.rs | 4 +- .../src/rest_ingest/moonlink_rest_sink.rs | 24 ++-- src/moonlink_service/src/test.rs | 2 +- 26 files changed, 252 insertions(+), 153 deletions(-) rename src/{moonlink_connectors/src/replication_state.rs => moonlink/src/lsn_state.rs} (89%) diff --git a/Cargo.lock b/Cargo.lock index edf2d2f1c..5e2efced6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1044,9 +1044,9 @@ dependencies = [ [[package]] name = "backon" -version = "1.5.2" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" dependencies = [ "fastrand", "gloo-timers", @@ -1458,7 +1458,7 @@ dependencies = [ "num-traits", "serde", "wasm-bindgen", - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -3093,11 +3093,12 @@ checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" [[package]] name = "fastbloom" -version = "0.12.1" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29ec576c163744bef8707859f6aeb322bcf56b8da61215d99f77d6e33160ff01" +checksum = "18c1ddb9231d8554c2d6bdf4cfaabf0c59251658c68b6c95cd52dd0c513a912a" dependencies = [ "getrandom 0.3.3", + "libm", "rand 0.9.2", "siphasher", ] @@ -3208,6 +3209,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -3554,7 +3561,18 @@ checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ "allocator-api2", "equivalent", - "foldhash", + "foldhash 0.1.5", +] + +[[package]] +name = "hashbrown" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash 0.2.0", ] [[package]] @@ -3861,7 +3879,7 @@ dependencies = [ [[package]] name = "iceberg" version = "0.7.0" -source = "git+https://github.com/apache/iceberg-rust?rev=1de331593ea877e3a8b570e328d26f8362ad2035#1de331593ea877e3a8b570e328d26f8362ad2035" +source = "git+https://github.com/apache/iceberg-rust?rev=790e0253a6d1ba83a54f51cf8a8ed62a5c2da7f5#790e0253a6d1ba83a54f51cf8a8ed62a5c2da7f5" dependencies = [ "anyhow", "apache-avro 0.20.0", @@ -3916,7 +3934,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-glue" version = "0.7.0" -source = "git+https://github.com/apache/iceberg-rust?rev=1de331593ea877e3a8b570e328d26f8362ad2035#1de331593ea877e3a8b570e328d26f8362ad2035" +source = "git+https://github.com/apache/iceberg-rust?rev=790e0253a6d1ba83a54f51cf8a8ed62a5c2da7f5#790e0253a6d1ba83a54f51cf8a8ed62a5c2da7f5" dependencies = [ "anyhow", "async-trait", @@ -3931,7 +3949,7 @@ dependencies = [ [[package]] name = "iceberg-catalog-rest" version = "0.7.0" -source = "git+https://github.com/apache/iceberg-rust?rev=1de331593ea877e3a8b570e328d26f8362ad2035#1de331593ea877e3a8b570e328d26f8362ad2035" +source = "git+https://github.com/apache/iceberg-rust?rev=790e0253a6d1ba83a54f51cf8a8ed62a5c2da7f5#790e0253a6d1ba83a54f51cf8a8ed62a5c2da7f5" dependencies = [ "async-trait", "chrono", @@ -4137,17 +4155,6 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" -[[package]] -name = "io-uring" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" -dependencies = [ - "bitflags 2.9.4", - "cfg-if", - "libc", -] - [[package]] name = "ipnet" version = "2.11.0" @@ -4514,11 +4521,11 @@ dependencies = [ [[package]] name = "lru" -version = "0.14.0" +version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f8cc7106155f10bdf99a6f379688f543ad6596a415375b36a59a054ceda1198" +checksum = "96051b46fc183dc9cd4a223960ef37b9af631b55191852a8274bfef064cda20f" dependencies = [ - "hashbrown 0.15.5", + "hashbrown 0.16.0", ] [[package]] @@ -4712,7 +4719,7 @@ dependencies = [ "fastbloom", "function_name", "futures", - "hashbrown 0.15.5", + "hashbrown 0.16.0", "hmac", "iceberg", "iceberg-catalog-glue", @@ -4751,7 +4758,7 @@ dependencies = [ "tokio", "tokio-bitstream-io", "tracing", - "typed-builder 0.20.1", + "typed-builder 0.22.0", "url", "uuid", ] @@ -7940,23 +7947,20 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.47.1" +version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" +checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ - "backtrace", "bytes", - "io-uring", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", - "slab", "socket2 0.6.0", "tokio-macros", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -7971,9 +7975,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", @@ -8330,6 +8334,15 @@ dependencies = [ "typed-builder-macro 0.20.1", ] +[[package]] +name = "typed-builder" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "398a3a3c918c96de527dc11e6e846cd549d4508030b8a33e1da12789c856b81a" +dependencies = [ + "typed-builder-macro 0.22.0", +] + [[package]] name = "typed-builder-macro" version = "0.19.1" @@ -8352,6 +8365,17 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "typed-builder-macro" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e48cea23f68d1f78eb7bc092881b6bb88d3d6b5b7e6234f6f9c911da1ffb221" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "typenum" version = "1.18.0" @@ -8734,7 +8758,7 @@ dependencies = [ "windows-collections", "windows-core", "windows-future", - "windows-link", + "windows-link 0.1.3", "windows-numerics", ] @@ -8755,7 +8779,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ "windows-implement", "windows-interface", - "windows-link", + "windows-link 0.1.3", "windows-result", "windows-strings", ] @@ -8767,7 +8791,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ "windows-core", - "windows-link", + "windows-link 0.1.3", "windows-threading", ] @@ -8799,6 +8823,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-numerics" version = "0.2.0" @@ -8806,7 +8836,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" dependencies = [ "windows-core", - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -8815,7 +8845,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b8a9ed28765efc97bbc954883f4e6796c33a06546ebafacbabee9696967499e" dependencies = [ - "windows-link", + "windows-link 0.1.3", "windows-result", "windows-strings", ] @@ -8826,7 +8856,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -8835,7 +8865,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link", + "windows-link 0.1.3", ] [[package]] @@ -8874,6 +8904,15 @@ dependencies = [ "windows-targets 0.53.3", ] +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -8911,7 +8950,7 @@ version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ - "windows-link", + "windows-link 0.1.3", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", @@ -8928,7 +8967,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" dependencies = [ - "windows-link", + "windows-link 0.1.3", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 975a36065..43e62ea65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ arrow-ipc = "55" arrow-schema = { version = "55", features = ["serde"] } async-stream = "0.3" async-trait = "0.1" -backon = "1.5" +backon = "1.6" bincode = { version = "2", features = ["serde"] } bitstream-io = "4.9" bytes = "1.0" @@ -41,16 +41,16 @@ crc32fast = "1" datafusion = "49" datafusion-cli = "49" deltalake = "0.28" -fastbloom = "0.12" +fastbloom = "0.14" futures = { version = "0.3", default-features = false } -hashbrown = "0.15" -iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "1de331593ea877e3a8b570e328d26f8362ad2035", default-features = false, features = [ +hashbrown = "0.16" +iceberg = { git = "https://github.com/apache/iceberg-rust", rev = "790e0253a6d1ba83a54f51cf8a8ed62a5c2da7f5", default-features = false, features = [ "storage-fs", ] } -iceberg-catalog-glue = { git = "https://github.com/apache/iceberg-rust", rev = "1de331593ea877e3a8b570e328d26f8362ad2035" } -iceberg-catalog-rest = { git = "https://github.com/apache/iceberg-rust", rev = "1de331593ea877e3a8b570e328d26f8362ad2035" } +iceberg-catalog-glue = { git = "https://github.com/apache/iceberg-rust", rev = "790e0253a6d1ba83a54f51cf8a8ed62a5c2da7f5" } +iceberg-catalog-rest = { git = "https://github.com/apache/iceberg-rust", rev = "790e0253a6d1ba83a54f51cf8a8ed62a5c2da7f5" } itertools = "0.14" -lru = "0.14" +lru = "0.16" moka = { version = "0.12", features = ["future"] } moonlink = { path = "src/moonlink" } moonlink_backend = { path = "src/moonlink_backend" } @@ -112,7 +112,7 @@ serial_test = "3.0" smallvec = "1.15" tempfile = "3" thiserror = "2" -tokio = { version = "1.47", default-features = false, features = [ +tokio = { version = "1.48", default-features = false, features = [ "fs", "io-util", "macros", @@ -129,7 +129,7 @@ tokio-postgres = { git = "https://github.com/Mooncake-labs/rust-postgres.git", r ] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "chrono"] } -typed-builder = "0.20" +typed-builder = "0.22" url = "2.5" uuid = { version = "1.18", default-features = false, features = ["v4"] } diff --git a/deny.toml b/deny.toml index 1e98255d1..8be566dbc 100644 --- a/deny.toml +++ b/deny.toml @@ -37,33 +37,52 @@ ignore = [ # TODO(hjiang): Resolve all duplicate dependencies. [bans] skip = [ + { name = "apache-avro" }, { name = "axum" }, { name = "axum-core" }, { name = "base64" }, + { name = "bitflags" }, { name = "bzip2" }, { name = "core-foundation" }, + { name = "darling" }, + { name = "darling_core" }, + { name = "darling_macro" }, { name = "getrandom" }, + { name = "h2" }, { name = "hashbrown" }, { name = "http" }, { name = "http-body" }, + { name = "hyper" }, + { name = "hyper-rustls" }, { name = "indexmap" }, { name = "itertools" }, + { name = "linux-raw-sys" }, { name = "matchit" }, + { name = "nix" }, { name = "ordered-float" }, { name = "petgraph" }, { name = "phf" }, { name = "phf_shared" }, + { name = "prost-derive" }, { name = "quick-xml" }, { name = "rand" }, { name = "rand_chacha" }, { name = "rand_core" }, + { name = "rustix" }, + { name = "rustls" }, + { name = "rustls-native-certs" }, + { name = "rustls-pemfile" }, + { name = "rustls-webpki" }, { name = "security-framework" }, + { name = "spin" }, + { name = "sqlparser" }, { name = "socket2" }, { name = "strum" }, { name = "strum_macros" }, { name = "tonic" }, { name = "thiserror" }, { name = "thiserror-impl" }, + { name = "tokio-rustls" }, { name = "tower" }, { name = "typed-builder" }, { name = "typed-builder-macro" }, diff --git a/src/moonlink/src/lib.rs b/src/moonlink/src/lib.rs index 43db3c0d0..6778f2ea6 100644 --- a/src/moonlink/src/lib.rs +++ b/src/moonlink/src/lib.rs @@ -1,5 +1,6 @@ pub mod error; pub mod event_sync; +pub mod lsn_state; pub mod mooncake_table_id; mod observability; pub mod row; @@ -11,12 +12,13 @@ mod union_read; pub use error::*; pub use event_sync::EventSyncSender; +pub use lsn_state::{CommitState, ReplicationState}; pub use mooncake_table_id::MooncakeTableId; pub use storage::mooncake_table::batch_id_counter::BatchIdCounter; pub use storage::mooncake_table::data_batches::ColumnStoreBuffer; pub use storage::parquet_utils::get_default_parquet_properties; pub use storage::storage_utils::create_data_file; -#[cfg(feature = "catalog-glue")] +#[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] pub use storage::IcebergGlueCatalogConfig; #[cfg(feature = "catalog-rest")] pub use storage::IcebergRestCatalogConfig; diff --git a/src/moonlink_connectors/src/replication_state.rs b/src/moonlink/src/lsn_state.rs similarity index 89% rename from src/moonlink_connectors/src/replication_state.rs rename to src/moonlink/src/lsn_state.rs index 22259734f..a1244db91 100644 --- a/src/moonlink_connectors/src/replication_state.rs +++ b/src/moonlink/src/lsn_state.rs @@ -7,20 +7,20 @@ use tracing::warn; /// Tracks replication progress and notifies listeners when the replicated /// LSN advances. -pub struct ReplicationState { +pub struct LsnState { current: AtomicU64, tx: watch::Sender, } -impl std::fmt::Debug for ReplicationState { +impl std::fmt::Debug for LsnState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ReplicationState") + f.debug_struct("LsnState") .field("current", &self.current.load(Ordering::SeqCst)) .finish() } } -impl ReplicationState { +impl LsnState { /// Create a new state initialised to LSN 0. pub fn new() -> Arc { let (tx, _rx) = watch::channel(0); @@ -53,13 +53,16 @@ impl ReplicationState { } } +pub type ReplicationState = LsnState; +pub type CommitState = LsnState; + #[cfg(test)] mod tests { use super::*; #[test] fn mark_without_subscribers_does_not_panic_and_updates_state() { - let state = ReplicationState::new(); + let state = LsnState::new(); assert_eq!(state.now(), 0); // No subscribers have been created; this will panic without the fix. state.mark(42); @@ -68,7 +71,7 @@ mod tests { #[test] fn mark_after_last_subscriber_dropped_does_not_panic() { - let state = ReplicationState::new(); + let state = LsnState::new(); // Create a subscriber and then drop it to simulate shutdown. let rx = state.subscribe(); drop(rx); diff --git a/src/moonlink/src/storage.rs b/src/moonlink/src/storage.rs index 002239bc0..dd4a0b875 100644 --- a/src/moonlink/src/storage.rs +++ b/src/moonlink/src/storage.rs @@ -45,7 +45,7 @@ pub use table::iceberg::base_iceberg_snapshot_fetcher::BaseIcebergSnapshotFetche pub use table::iceberg::cloud_security_config::{AwsSecurityConfig, CloudSecurityConfig}; pub use table::iceberg::iceberg_snapshot_fetcher::IcebergSnapshotFetcher; pub use table::iceberg::iceberg_table_config::FileCatalogConfig as IcebergFileCatalogConfig; -#[cfg(feature = "catalog-glue")] +#[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] pub use table::iceberg::iceberg_table_config::GlueCatalogConfig as IcebergGlueCatalogConfig; #[cfg(feature = "catalog-rest")] pub use table::iceberg::iceberg_table_config::RestCatalogConfig as IcebergRestCatalogConfig; diff --git a/src/moonlink/src/storage/cache/metadata/moka_cache.rs b/src/moonlink/src/storage/cache/metadata/moka_cache.rs index 37a55c59d..c2cb6a6d6 100644 --- a/src/moonlink/src/storage/cache/metadata/moka_cache.rs +++ b/src/moonlink/src/storage/cache/metadata/moka_cache.rs @@ -22,6 +22,7 @@ use moka::future::Cache; /// - **Max size**: limits the number of entries (or total weight if using a custom weigher). /// - **Asynchronous operations**: all API methods are `async`. /// +#[allow(unused)] pub struct MokaCache { pub(crate) cache: Cache, } diff --git a/src/moonlink/src/storage/filesystem/storage_config.rs b/src/moonlink/src/storage/filesystem/storage_config.rs index 4698d4bd2..05ac46cc5 100644 --- a/src/moonlink/src/storage/filesystem/storage_config.rs +++ b/src/moonlink/src/storage/filesystem/storage_config.rs @@ -3,6 +3,7 @@ use crate::MoonlinkSecretType; use crate::MoonlinkTableSecret; use serde::{Deserialize, Serialize}; +#[cfg(feature = "storage-gcs")] #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] pub struct WriteOption { /// Used to overwrite write option. diff --git a/src/moonlink/src/storage/mooncake_table.rs b/src/moonlink/src/storage/mooncake_table.rs index 9e97f1d70..f100405df 100644 --- a/src/moonlink/src/storage/mooncake_table.rs +++ b/src/moonlink/src/storage/mooncake_table.rs @@ -1587,10 +1587,10 @@ impl MooncakeTable { .await; // Notify on event error. - if iceberg_persistence_res.is_err() { + if let Err(err) = iceberg_persistence_res { table_notify .send(TableEvent::PersistenceSnapshotResult { - persistence_snapshot_result: Err(iceberg_persistence_res.unwrap_err()), + persistence_snapshot_result: Err(err), }) .await .unwrap(); diff --git a/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs b/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs index 3145de4fe..8eb1a1f06 100644 --- a/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs +++ b/src/moonlink/src/storage/mooncake_table/batch_ingestion.rs @@ -303,8 +303,8 @@ mod tests { let file2 = data_dir.join("file2.parquet"); let b1 = batch_with_rows(&[10, 11, 12]); let b2 = batch_with_rows(&[20, 21, 22]); - write_parquet_file(&file1, &[b1.clone()]).await; - write_parquet_file(&file2, &[b2.clone()]).await; + write_parquet_file(&file1, std::slice::from_ref(&b1)).await; + write_parquet_file(&file2, std::slice::from_ref(&b2)).await; let lsn = 200u64; let storage_config = crate::StorageConfig::FileSystem { @@ -365,8 +365,8 @@ mod tests { let file2 = data_dir.join("file2.parquet"); let b1 = batch_with_rows(&[101, 102]); let b2 = batch_with_rows(&[201, 202]); - write_parquet_file(&file1, &[b1.clone()]).await; - write_parquet_file(&file2, &[b2.clone()]).await; + write_parquet_file(&file1, std::slice::from_ref(&b1)).await; + write_parquet_file(&file2, std::slice::from_ref(&b2)).await; let storage_config = crate::StorageConfig::FileSystem { root_directory: context.path().to_str().unwrap().to_string(), @@ -421,7 +421,7 @@ mod tests { let file1 = data_dir.join("file1.parquet"); let b1 = batch_with_rows(&[301, 302, 303]); - write_parquet_file(&file1, &[b1.clone()]).await; + write_parquet_file(&file1, std::slice::from_ref(&b1)).await; let storage_config = crate::StorageConfig::FileSystem { root_directory: context.path().to_str().unwrap().to_string(), diff --git a/src/moonlink/src/storage/mooncake_table/data_batches.rs b/src/moonlink/src/storage/mooncake_table/data_batches.rs index 4c00aa283..f56822938 100644 --- a/src/moonlink/src/storage/mooncake_table/data_batches.rs +++ b/src/moonlink/src/storage/mooncake_table/data_batches.rs @@ -314,11 +314,11 @@ impl ColumnStoreBuffer { #[must_use] pub(super) fn try_delete_at_pos(&mut self, pos: (u64, usize)) -> bool { - let idx = self + if let Ok(idx) = self .in_memory_batches - .binary_search_by_key(&pos.0, |x| x.id); - if idx.is_ok() { - let res = self.in_memory_batches[idx.unwrap()] + .binary_search_by_key(&pos.0, |x| x.id) + { + let res = self.in_memory_batches[idx] .batch .deletions .delete_row(pos.1); diff --git a/src/moonlink/src/storage/table/iceberg.rs b/src/moonlink/src/storage/table/iceberg.rs index b9120a426..b155be6f4 100644 --- a/src/moonlink/src/storage/table/iceberg.rs +++ b/src/moonlink/src/storage/table/iceberg.rs @@ -23,7 +23,7 @@ pub(crate) mod puffin_utils; pub(crate) mod puffin_writer_proxy; mod table_update_proxy; -#[cfg(feature = "catalog-glue")] +#[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] pub(crate) mod glue_catalog; #[cfg(feature = "catalog-rest")] @@ -90,14 +90,14 @@ mod catalog_test_impl; #[cfg(test)] mod iceberg_rest_catalog_test; -#[cfg(feature = "catalog-glue")] +#[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] #[cfg(test)] mod glue_catalog_test_utils; -#[cfg(feature = "catalog-glue")] +#[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] #[cfg(test)] mod glue_catalog_test; -#[cfg(feature = "catalog-glue")] +#[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] #[cfg(test)] mod iceberg_glue_catalog_test; diff --git a/src/moonlink/src/storage/table/iceberg/catalog_utils.rs b/src/moonlink/src/storage/table/iceberg/catalog_utils.rs index 94bdcc289..1d9567f63 100644 --- a/src/moonlink/src/storage/table/iceberg/catalog_utils.rs +++ b/src/moonlink/src/storage/table/iceberg/catalog_utils.rs @@ -1,7 +1,7 @@ #[cfg(test)] use crate::storage::filesystem::accessor::base_filesystem_accessor::BaseFileSystemAccess; use crate::storage::table::iceberg::file_catalog::FileCatalog; -#[cfg(feature = "catalog-glue")] +#[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] use crate::storage::table::iceberg::glue_catalog::GlueCatalog; use crate::storage::table::iceberg::iceberg_table_config::IcebergCatalogConfig; use crate::storage::table::iceberg::iceberg_table_config::IcebergTableConfig; @@ -45,7 +45,7 @@ pub async fn create_catalog( ) .await?, )), - #[cfg(feature = "catalog-glue")] + #[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] IcebergCatalogConfig::Glue { glue_catalog_config, } => Ok(Box::new( @@ -74,7 +74,7 @@ pub async fn create_catalog_without_schema( RestCatalog::new_without_schema(rest_catalog_config, config.data_accessor_config) .await?, )), - #[cfg(feature = "catalog-glue")] + #[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] IcebergCatalogConfig::Glue { glue_catalog_config, } => Ok(Box::new( diff --git a/src/moonlink/src/storage/table/iceberg/glue_catalog.rs b/src/moonlink/src/storage/table/iceberg/glue_catalog.rs index a14dbcb95..572a9ac40 100644 --- a/src/moonlink/src/storage/table/iceberg/glue_catalog.rs +++ b/src/moonlink/src/storage/table/iceberg/glue_catalog.rs @@ -22,7 +22,6 @@ use iceberg_catalog_glue::{ }; use std::collections::{HashMap, HashSet}; -#[derive(Debug)] pub struct GlueCatalog { pub(crate) catalog: IcebergGlueCatalog, /// Similar to opendal operator, which also provides an abstraction above different storage backends. @@ -35,6 +34,15 @@ pub struct GlueCatalog { table_update_proxy: TableUpdateProxy, } +impl std::fmt::Debug for GlueCatalog { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GlueCatalog") + .field("warehouse_location", &self.warehouse_location) + .field("iceberg_schema", &self.iceberg_schema) + .finish() + } +} + /// Util function to get config properties from iceberg table config. /// If not S3 storage config, return error. fn extract_glue_config_properties( diff --git a/src/moonlink/src/storage/table/iceberg/iceberg_table_config.rs b/src/moonlink/src/storage/table/iceberg/iceberg_table_config.rs index 402b1d137..e4a95f2e4 100644 --- a/src/moonlink/src/storage/table/iceberg/iceberg_table_config.rs +++ b/src/moonlink/src/storage/table/iceberg/iceberg_table_config.rs @@ -1,9 +1,11 @@ -#[cfg(feature = "catalog-glue")] +#[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] use crate::storage::table::iceberg::cloud_security_config::CloudSecurityConfig; use crate::{storage::filesystem::accessor_config::AccessorConfig, StorageConfig}; use serde::{Deserialize, Serialize}; +#[cfg(feature = "catalog-rest")] use std::collections::HashMap; +#[cfg(feature = "catalog-rest")] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct RestCatalogConfig { #[serde(rename = "name")] @@ -33,7 +35,7 @@ pub struct RestCatalogConfig { pub props: HashMap, } -#[cfg(feature = "catalog-glue")] +#[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct GlueCatalogConfig { /// ======================== @@ -84,7 +86,7 @@ pub enum IcebergCatalogConfig { rest_catalog_config: RestCatalogConfig, }, - #[cfg(feature = "catalog-glue")] + #[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] #[serde(rename = "glue")] Glue { glue_catalog_config: GlueCatalogConfig, @@ -99,7 +101,7 @@ impl IcebergCatalogConfig { IcebergCatalogConfig::Rest { rest_catalog_config, } => rest_catalog_config.warehouse.clone(), - #[cfg(feature = "catalog-glue")] + #[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] IcebergCatalogConfig::Glue { glue_catalog_config, } => glue_catalog_config.warehouse.clone(), @@ -125,7 +127,7 @@ impl IcebergCatalogConfig { None } - #[cfg(feature = "catalog-glue")] + #[cfg(all(feature = "catalog-glue", feature = "storage-s3"))] pub fn get_glue_catalog_config(&self) -> Option { if let IcebergCatalogConfig::Glue { glue_catalog_config, diff --git a/src/moonlink/src/storage/table/iceberg/rest_catalog.rs b/src/moonlink/src/storage/table/iceberg/rest_catalog.rs index 55324b0e7..7f6aa2ff2 100644 --- a/src/moonlink/src/storage/table/iceberg/rest_catalog.rs +++ b/src/moonlink/src/storage/table/iceberg/rest_catalog.rs @@ -19,7 +19,6 @@ use iceberg_catalog_rest::{ }; use std::collections::{HashMap, HashSet}; -#[derive(Debug)] pub struct RestCatalog { pub(crate) catalog: IcebergRestCatalog, /// Similar to opendal operator, which also provides an abstraction above different storage backends. @@ -32,6 +31,15 @@ pub struct RestCatalog { table_update_proxy: TableUpdateProxy, } +impl std::fmt::Debug for RestCatalog { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RestCatalog") + .field("warehouse_location", &self.warehouse_location) + .field("iceberg_schema", &self.iceberg_schema) + .finish() + } +} + impl RestCatalog { pub async fn new( mut config: RestCatalogConfig, diff --git a/src/moonlink/src/table_handler/chaos_test.rs b/src/moonlink/src/table_handler/chaos_test.rs index 8560a3f98..28e4c566b 100644 --- a/src/moonlink/src/table_handler/chaos_test.rs +++ b/src/moonlink/src/table_handler/chaos_test.rs @@ -969,7 +969,7 @@ impl TestEnvironment { file.write_all(b"\n").await.unwrap(); written_events_count += 1; - if written_events_count % WRITREN_EVENT_FLUSH_INTERVAL == 0 { + if written_events_count.is_multiple_of(WRITREN_EVENT_FLUSH_INTERVAL) { file.flush().await.unwrap(); } } diff --git a/src/moonlink_connectors/src/lib.rs b/src/moonlink_connectors/src/lib.rs index 2242c1dc4..2231950cf 100644 --- a/src/moonlink_connectors/src/lib.rs +++ b/src/moonlink_connectors/src/lib.rs @@ -2,7 +2,6 @@ pub mod error; pub mod pg_replicate; mod replication_connection; mod replication_manager; -pub mod replication_state; pub mod rest_ingest; pub use error::*; diff --git a/src/moonlink_connectors/src/pg_replicate.rs b/src/moonlink_connectors/src/pg_replicate.rs index a81609745..ee9602581 100644 --- a/src/moonlink_connectors/src/pg_replicate.rs +++ b/src/moonlink_connectors/src/pg_replicate.rs @@ -20,12 +20,11 @@ use crate::pg_replicate::postgres_source::{ }; use crate::pg_replicate::table::{SrcTableId, TableName, TableSchema}; use crate::pg_replicate::table_init::{build_table_components, TableComponents}; -use crate::replication_state::ReplicationState; use crate::Result; use futures::StreamExt; use moonlink::{ - MooncakeTableId, MoonlinkTableConfig, ObjectStorageCache, ReadStateFilepathRemap, TableEvent, - WalManager, + CommitState, MooncakeTableId, MoonlinkTableConfig, ObjectStorageCache, ReadStateFilepathRemap, + ReplicationState, TableEvent, WalManager, }; use native_tls::{Certificate, TlsConnector}; use pg_escape::{quote_identifier, quote_literal}; @@ -66,7 +65,7 @@ pub enum PostgresReplicationCommand { src_table_id: SrcTableId, schema: TableSchema, event_sender: mpsc::Sender, - commit_lsn_tx: watch::Sender, + commit_state: Arc, flush_lsn_rx: watch::Receiver, wal_flush_lsn_rx: watch::Receiver, ready_tx: oneshot::Sender<()>, @@ -247,7 +246,7 @@ impl PostgresConnection { schema: &TableSchema, event_sender: mpsc::Sender, is_recovery: bool, - commit_lsn_tx: watch::Sender, + commit_lsn_tx: Arc, table_base_path: &str, ) -> Result<(bool)> { let src_table_id = schema.src_table_id; @@ -296,9 +295,7 @@ impl PostgresConnection { } // Notify read state manager with the commit LSN for the initial copy boundary. - if let Err(e) = commit_lsn_tx.send(progress.boundary_lsn.into()) { - warn!(error = ?e, table_id = src_table_id, "failed to send initial copy commit lsn"); - } + commit_lsn_tx.mark(progress.boundary_lsn.into()); self.replication_state.mark(progress.boundary_lsn.into()); Ok(true) @@ -398,7 +395,7 @@ impl PostgresConnection { src_table_id: SrcTableId, schema: TableSchema, event_sender: mpsc::Sender, - commit_lsn_tx: watch::Sender, + commit_state: Arc, flush_lsn_rx: watch::Receiver, wal_flush_lsn_rx: watch::Receiver, ) -> Result> { @@ -407,7 +404,7 @@ impl PostgresConnection { src_table_id, schema, event_sender, - commit_lsn_tx, + commit_state, flush_lsn_rx, wal_flush_lsn_rx, ready_tx, @@ -522,7 +519,7 @@ impl PostgresConnection { // Send command to add table to replication let commit_lsn_tx = table_resources - .commit_lsn_tx + .commit_state .take() .expect("commit_lsn_tx is None"); let commit_lsn_tx_for_copy = commit_lsn_tx.clone(); @@ -763,8 +760,8 @@ pub async fn run_event_loop( } }, Some(cmd) = cmd_rx.recv() => match cmd { - PostgresReplicationCommand::AddTable { src_table_id, schema, event_sender, commit_lsn_tx, flush_lsn_rx, wal_flush_lsn_rx, ready_tx } => { - sink.add_table(src_table_id, event_sender, commit_lsn_tx, &schema); + PostgresReplicationCommand::AddTable { src_table_id, schema, event_sender, commit_state, flush_lsn_rx, wal_flush_lsn_rx, ready_tx } => { + sink.add_table(src_table_id, event_sender, commit_state, &schema); flush_lsn_rxs.insert(src_table_id, flush_lsn_rx); wal_flush_lsn_rxs.insert(src_table_id, wal_flush_lsn_rx); stream.as_mut().add_table_schema(schema); diff --git a/src/moonlink_connectors/src/pg_replicate/moonlink_sink.rs b/src/moonlink_connectors/src/pg_replicate/moonlink_sink.rs index b4c78341b..cc0882ecd 100644 --- a/src/moonlink_connectors/src/pg_replicate/moonlink_sink.rs +++ b/src/moonlink_connectors/src/pg_replicate/moonlink_sink.rs @@ -3,8 +3,8 @@ use crate::pg_replicate::{ conversions::{cdc_event::CdcEvent, table_row::TableRow}, table::{SrcTableId, TableSchema}, }; -use crate::replication_state::ReplicationState; use moonlink::TableEvent; +use moonlink::{CommitState, ReplicationState}; use more_asserts as ma; use postgres_replication::protocol::Column as ReplicationColumn; use std::collections::HashMap; @@ -35,7 +35,7 @@ struct ColumnInfo { } pub struct Sink { event_senders: HashMap>, - commit_lsn_txs: HashMap>, + commit_lsn_txs: HashMap>, streaming_transactions_state: HashMap, transaction_state: TransactionState, replication_state: Arc, @@ -98,7 +98,7 @@ impl Sink { &mut self, src_table_id: SrcTableId, event_sender: Sender, - commit_lsn_tx: watch::Sender, + commit_lsn_tx: Arc, table_schema: &TableSchema, ) { self.event_senders.insert(src_table_id, event_sender); @@ -215,10 +215,8 @@ impl Sink { ma::assert_ge!(commit_body.end_lsn(), self.max_keepalive_lsn_seen); for table_id in &self.transaction_state.touched_tables { let event_sender = self.event_senders.get(table_id); - if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id).cloned() { - if let Err(e) = commit_lsn_tx.send(commit_body.end_lsn()) { - warn!(error = ?e, "failed to send commit lsn"); - } + if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id) { + commit_lsn_tx.mark(commit_body.end_lsn()); } if let Some(event_sender) = event_sender { if let Err(e) = Self::send_table_event( @@ -252,10 +250,8 @@ impl Sink { if let Some(tables_in_txn) = self.streaming_transactions_state.get(&xact_id) { for table_id in &tables_in_txn.touched_tables { let event_sender = self.event_senders.get(table_id); - if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id).cloned() { - if let Err(e) = commit_lsn_tx.send(stream_commit_body.end_lsn()) { - warn!(error = ?e, "failed to send stream commit lsn"); - } + if let Some(commit_lsn_tx) = self.commit_lsn_txs.get(table_id) { + commit_lsn_tx.mark(stream_commit_body.end_lsn()); } if let Some(event_sender) = event_sender { if let Err(e) = Self::send_table_event( @@ -443,9 +439,9 @@ mod tests { // Setup one table with event sender and commit lsn channel let table_id: SrcTableId = 1; let (tx, mut rx) = mpsc::channel::(64); - let (commit_tx, _commit_rx) = watch::channel::(0); + let commit_state = CommitState::new(); let schema = make_table_schema(table_id); - sink.add_table(table_id, tx, commit_tx, &schema); + sink.add_table(table_id, tx, commit_state, &schema); // Many inserts for the same (xid, table) pair let xid = Some(42u32); @@ -493,10 +489,10 @@ mod tests { let b: SrcTableId = 12; let (tx_a, mut rx_a) = mpsc::channel::(8); let (tx_b, mut rx_b) = mpsc::channel::(8); - let (commit_tx_a, _rx_a) = watch::channel::(0); - let (commit_tx_b, _rx_b) = watch::channel::(0); - sink.add_table(a, tx_a, commit_tx_a, &make_table_schema(a)); - sink.add_table(b, tx_b, commit_tx_b, &make_table_schema(b)); + let commit_state_a = CommitState::new(); + let commit_state_b = CommitState::new(); + sink.add_table(a, tx_a, commit_state_a.clone(), &make_table_schema(a)); + sink.add_table(b, tx_b, commit_state_b.clone(), &make_table_schema(b)); // Many inserts into A then into B within the same non-streaming transaction for _ in 0..5 { @@ -528,12 +524,17 @@ mod tests { #[tokio::test] async fn cached_sender_cleared_on_drop_table() { let replication_state = ReplicationState::new(); + let commit_state = CommitState::new(); let mut sink = Sink::new(replication_state); let table_id: SrcTableId = 21; let (tx, _rx) = mpsc::channel::(4); - let (commit_tx, _commit_rx) = watch::channel::(0); - sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id)); + sink.add_table( + table_id, + tx, + commit_state.clone(), + &make_table_schema(table_id), + ); // Populate sender cache let _ = sink.get_event_sender_for(table_id); @@ -547,12 +548,18 @@ mod tests { #[tokio::test] async fn interleaved_streams_do_not_use_stale_cache() { let replication_state = ReplicationState::new(); + let commit_state = CommitState::new(); let mut sink = Sink::new(replication_state); let table_id: SrcTableId = 31; let (tx, mut rx) = mpsc::channel::(16); let (commit_tx, _commit_rx) = watch::channel::(0); - sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id)); + sink.add_table( + table_id, + tx, + commit_state.clone(), + &make_table_schema(table_id), + ); let xid1 = Some(100u32); let xid2 = Some(200u32); @@ -611,16 +618,17 @@ mod tests { #[tokio::test] async fn cache_updates_on_table_change_same_xid() { let replication_state = ReplicationState::new(); + let commit_state = CommitState::new(); let mut sink = Sink::new(replication_state); let a: SrcTableId = 41; let b: SrcTableId = 42; let (tx_a, mut rx_a) = mpsc::channel::(8); let (tx_b, mut rx_b) = mpsc::channel::(8); - let (commit_tx_a, _rx_a) = watch::channel::(0); - let (commit_tx_b, _rx_b) = watch::channel::(0); - sink.add_table(a, tx_a, commit_tx_a, &make_table_schema(a)); - sink.add_table(b, tx_b, commit_tx_b, &make_table_schema(b)); + let commit_state_a = CommitState::new(); + let commit_state_b = CommitState::new(); + sink.add_table(a, tx_a, commit_state_a.clone(), &make_table_schema(a)); + sink.add_table(b, tx_b, commit_state_b.clone(), &make_table_schema(b)); let xid = Some(777u32); // A then B under same xid @@ -651,12 +659,17 @@ mod tests { #[tokio::test] async fn sender_cache_persists_across_xid_and_stream_like_boundaries() { let replication_state = ReplicationState::new(); + let commit_state = CommitState::new(); let mut sink = Sink::new(replication_state); let table_id: SrcTableId = 51; let (tx, mut rx) = mpsc::channel::(8); - let (commit_tx, _commit_rx) = watch::channel::(0); - sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id)); + sink.add_table( + table_id, + tx, + commit_state.clone(), + &make_table_schema(table_id), + ); let xid1 = Some(1u32); let xid2 = Some(2u32); @@ -692,12 +705,17 @@ mod tests { #[tokio::test] async fn non_streaming_state_resets_between_transactions() { let replication_state = ReplicationState::new(); + let commit_state = CommitState::new(); let mut sink = Sink::new(replication_state); let table_id: SrcTableId = 61; let (tx, mut rx) = mpsc::channel::(8); - let (commit_tx, _commit_rx) = watch::channel::(0); - sink.add_table(table_id, tx, commit_tx, &make_table_schema(table_id)); + sink.add_table( + table_id, + tx, + commit_state.clone(), + &make_table_schema(table_id), + ); // First transaction: several inserts (non-streaming) for _ in 0..3 { diff --git a/src/moonlink_connectors/src/pg_replicate/table_init.rs b/src/moonlink_connectors/src/pg_replicate/table_init.rs index caabf9f7a..543a473b7 100644 --- a/src/moonlink_connectors/src/pg_replicate/table_init.rs +++ b/src/moonlink_connectors/src/pg_replicate/table_init.rs @@ -1,5 +1,4 @@ use crate::pg_replicate::table::SrcTableId; -use crate::replication_state::ReplicationState; use crate::{Error, Result}; use arrow_schema::Schema as ArrowSchema; use moonlink::event_sync::create_table_event_syncer; @@ -12,6 +11,7 @@ use moonlink::{ MoonlinkTableConfig, MoonlinkTableSecret, ObjectStorageCache, ReadStateManager, StorageConfig, TableEvent, TableEventManager, TableHandler, TableStatusReader, WalConfig, WalManager, }; +use moonlink::{CommitState, ReplicationState}; use std::io::ErrorKind; use std::path::{Path, PathBuf}; @@ -44,7 +44,7 @@ pub struct TableResources { pub read_state_manager: ReadStateManager, pub table_event_manager: TableEventManager, pub table_status_reader: TableStatusReader, - pub commit_lsn_tx: Option>, + pub commit_state: Option>, pub flush_lsn_rx: Option>, pub wal_flush_lsn_rx: Option>, pub wal_file_accessor: Arc, @@ -156,17 +156,18 @@ pub async fn build_table_components( let last_persistence_snapshot_lsn = table.get_persistence_snapshot_lsn(); - let (commit_lsn_tx, commit_lsn_rx) = watch::channel(0u64); + let commit_state = CommitState::new(); // Make a receiver first before possible mark operation, otherwise all receiver initializes with 0. - let replication_lsn_tx = replication_state.subscribe(); + let replication_lsn_rx = replication_state.subscribe(); + let commit_lsn_rx = commit_state.subscribe(); if let Some(persistence_snapshot_lsn) = last_persistence_snapshot_lsn { - commit_lsn_tx.send(persistence_snapshot_lsn).unwrap(); + commit_state.mark(persistence_snapshot_lsn); replication_state.mark(persistence_snapshot_lsn); } let read_state_manager = ReadStateManager::new( &table, - replication_lsn_tx, + replication_lsn_rx, commit_lsn_rx, table_components.read_state_filepath_remap, ); @@ -196,7 +197,7 @@ pub async fn build_table_components( read_state_manager, table_status_reader, table_event_manager, - commit_lsn_tx: Some(commit_lsn_tx), + commit_state: Some(commit_state), flush_lsn_rx: Some(flush_lsn_rx), wal_flush_lsn_rx: Some(wal_flush_lsn_rx), wal_file_accessor, diff --git a/src/moonlink_connectors/src/replication_connection.rs b/src/moonlink_connectors/src/replication_connection.rs index f8e22c11b..e4dec1f2a 100644 --- a/src/moonlink_connectors/src/replication_connection.rs +++ b/src/moonlink_connectors/src/replication_connection.rs @@ -287,7 +287,7 @@ impl ReplicationConnection { std::sync::Arc::new(arrow_schema), table_resources.event_sender.clone(), table_resources - .commit_lsn_tx + .commit_state .take() .expect("commit_lsn_tx not set"), table_resources diff --git a/src/moonlink_connectors/src/rest_ingest.rs b/src/moonlink_connectors/src/rest_ingest.rs index 56b5179e3..23a16b998 100644 --- a/src/moonlink_connectors/src/rest_ingest.rs +++ b/src/moonlink_connectors/src/rest_ingest.rs @@ -8,7 +8,6 @@ pub mod rest_event; pub mod rest_source; pub mod schema_util; -use crate::replication_state::ReplicationState; use crate::rest_ingest::event_request::EventRequest; use crate::rest_ingest::moonlink_rest_sink::RestSink; use crate::rest_ingest::moonlink_rest_sink::TableStatus; @@ -17,6 +16,8 @@ use crate::Result; use apache_avro::schema::Schema as AvroSchema; use arrow_schema::Schema; use futures::StreamExt; +use moonlink::CommitState; +use moonlink::ReplicationState; use moonlink::TableEvent; use more_asserts as ma; use std::sync::atomic::{AtomicU32, Ordering}; @@ -35,7 +36,7 @@ pub enum RestCommand { src_table_id: SrcTableId, schema: Arc, event_sender: mpsc::Sender, - commit_lsn_tx: watch::Sender, + commit_lsn_tx: Arc, flush_lsn_rx: watch::Receiver, wal_flush_lsn_rx: watch::Receiver, /// Persist LSN, only assigned for tables to recovery; used to indicate and update replication LSN. @@ -102,7 +103,7 @@ impl RestApiConnection { src_table_id: SrcTableId, schema: Arc, event_sender: mpsc::Sender, - commit_lsn_tx: watch::Sender, + commit_lsn_tx: Arc, flush_lsn_rx: watch::Receiver, wal_flush_lsn_rx: watch::Receiver, persist_lsn: Option, diff --git a/src/moonlink_connectors/src/rest_ingest/decimal_utils.rs b/src/moonlink_connectors/src/rest_ingest/decimal_utils.rs index bfdff3be0..1c92ee7d6 100644 --- a/src/moonlink_connectors/src/rest_ingest/decimal_utils.rs +++ b/src/moonlink_connectors/src/rest_ingest/decimal_utils.rs @@ -242,9 +242,9 @@ pub fn convert_decimal_to_row_value( if scale < 0 { handle_negative_scale(decimal, scale, precision) } else if scale > precision as i8 { - return handle_fractional_only(decimal, scale, precision); + handle_fractional_only(decimal, scale, precision) } else { - return handle_standard(decimal, scale, precision); + handle_standard(decimal, scale, precision) } } diff --git a/src/moonlink_connectors/src/rest_ingest/moonlink_rest_sink.rs b/src/moonlink_connectors/src/rest_ingest/moonlink_rest_sink.rs index 0df7f8055..844ee4f2e 100644 --- a/src/moonlink_connectors/src/rest_ingest/moonlink_rest_sink.rs +++ b/src/moonlink_connectors/src/rest_ingest/moonlink_rest_sink.rs @@ -1,8 +1,8 @@ -use crate::replication_state::ReplicationState; use crate::rest_ingest::event_request::RowEventOperation; use crate::rest_ingest::rest_event::RestEvent; use crate::rest_ingest::rest_source::SrcTableId; use crate::{Error, Result}; +use moonlink::{CommitState, ReplicationState}; use moonlink::{StorageConfig, TableEvent}; use std::collections::HashMap; use std::sync::Arc; @@ -15,7 +15,7 @@ pub struct TableStatus { pub(crate) _wal_flush_lsn_rx: watch::Receiver, pub(crate) _flush_lsn_rx: watch::Receiver, pub(crate) event_sender: mpsc::Sender, - pub(crate) commit_lsn_tx: watch::Sender, + pub(crate) commit_lsn_tx: Arc, } /// REST-specific sink for handling REST API table events @@ -47,7 +47,7 @@ impl RestSink { ) -> Result<()> { // Update per-table commit LSN. if let Some(persist_lsn) = persist_lsn { - table_status.commit_lsn_tx.send(persist_lsn).unwrap(); + table_status.commit_lsn_tx.mark(persist_lsn); } if self @@ -81,7 +81,7 @@ impl RestSink { /// - Replication LSN is used per-database fn mark_commit(&self, src_table_id: SrcTableId, lsn: u64) -> Result<()> { if let Some(table_status) = self.table_status.get(&src_table_id) { - table_status.commit_lsn_tx.send(lsn).unwrap(); + table_status.commit_lsn_tx.mark(lsn); } else { return Err(crate::Error::rest_api( format!("No table status found for src_table_id: {src_table_id}"), @@ -334,14 +334,14 @@ mod tests { // Create channels for testing let (event_tx, mut event_rx) = mpsc::channel::(10); - let (_commit_lsn_tx, _commit_lsn_rx) = watch::channel(0u64); + let commit_state = CommitState::new(); let (_wal_flush_lsn_tx, _wal_flush_lsn_rx) = watch::channel(0u64); let (_flush_lsn_tx, _flush_lsn_rx) = watch::channel(0u64); let table_status = TableStatus { _wal_flush_lsn_rx, _flush_lsn_rx, event_sender: event_tx, - commit_lsn_tx: _commit_lsn_tx, + commit_lsn_tx: commit_state, }; // Add table to sink @@ -430,14 +430,14 @@ mod tests { // Create channels for testing let (event_tx, mut event_rx) = mpsc::channel::(10); - let (_commit_lsn_tx, _commit_lsn_rx) = watch::channel(0u64); + let commit_state = CommitState::new(); let (_wal_flush_lsn_tx, _wal_flush_lsn_rx) = watch::channel(0u64); let (_flush_lsn_tx, _flush_lsn_rx) = watch::channel(0u64); let table_status = TableStatus { _wal_flush_lsn_rx, _flush_lsn_rx, event_sender: event_tx, - commit_lsn_tx: _commit_lsn_tx, + commit_lsn_tx: commit_state, }; let src_table_id = 1; @@ -507,25 +507,25 @@ mod tests { // Create channels for testing let (event_tx_1, mut event_rx_1) = mpsc::channel::(10); - let (_commit_lsn_tx_1, _commit_lsn_rx_1) = watch::channel(0u64); + let commit_state = CommitState::new(); let (_wal_flush_lsn_tx_1, _wal_flush_lsn_rx_1) = watch::channel(0u64); let (_flush_lsn_tx_1, _flush_lsn_rx_1) = watch::channel(0u64); let table_status_1 = TableStatus { _wal_flush_lsn_rx: _wal_flush_lsn_rx_1, _flush_lsn_rx: _flush_lsn_rx_1, event_sender: event_tx_1, - commit_lsn_tx: _commit_lsn_tx_1, + commit_lsn_tx: commit_state, }; let (event_tx_2, mut event_rx_2) = mpsc::channel::(10); - let (_commit_lsn_tx_2, _commit_lsn_rx_2) = watch::channel(0u64); + let commit_state = CommitState::new(); let (_wal_flush_lsn_tx_2, _wal_flush_lsn_rx_2) = watch::channel(0u64); let (_flush_lsn_tx_2, _flush_lsn_rx_2) = watch::channel(0u64); let table_status_2 = TableStatus { _wal_flush_lsn_rx: _wal_flush_lsn_rx_2, _flush_lsn_rx: _flush_lsn_rx_2, event_sender: event_tx_2, - commit_lsn_tx: _commit_lsn_tx_2, + commit_lsn_tx: commit_state, }; // Add two tables diff --git a/src/moonlink_service/src/test.rs b/src/moonlink_service/src/test.rs index 14ac12aa0..eee9c6bb9 100644 --- a/src/moonlink_service/src/test.rs +++ b/src/moonlink_service/src/test.rs @@ -1066,7 +1066,7 @@ async fn stress_test_kafka_avro_stress_ingest() { } let current_progress = progress_counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - if current_progress % 10000 == 0 { + if current_progress.is_multiple_of(10000) { println!( "Overall progress: {}/{} rows ({:.1}%)", current_progress + 1, From 0229180c3eab949e0b7003637c662db3f1733a12 Mon Sep 17 00:00:00 2001 From: thuong Date: Sun, 26 Oct 2025 05:37:47 +0000 Subject: [PATCH 3/5] fix: handle read LSN ordering validation asynchronously --- src/moonlink/src/error.rs | 10 ++++- .../src/union_read/read_state_manager.rs | 43 +++++++++++++++++-- 2 files changed, 49 insertions(+), 4 deletions(-) diff --git a/src/moonlink/src/error.rs b/src/moonlink/src/error.rs index 92d5fd115..7f70b8d3d 100644 --- a/src/moonlink/src/error.rs +++ b/src/moonlink/src/error.rs @@ -46,6 +46,9 @@ pub enum Error { #[error("{0}")] OtelExporterBuildError(ErrorStruct), + + #[error("{0}")] + ReadStateManager(ErrorStruct), } pub type Result = result::Result; @@ -59,6 +62,10 @@ impl Error { pub fn delta_generic_error(message: String) -> Self { Self::DeltaLakeError(ErrorStruct::new(message, ErrorStatus::Permanent)) } + #[track_caller] + pub fn read_validation_error(message: String) -> Self { + Self::ReadStateManager(ErrorStruct::new(message, ErrorStatus::Permanent)) + } } impl From for Error { @@ -216,7 +223,8 @@ impl Error { | Error::JoinError(err) | Error::PbToMoonlinkRowError(err) | Error::OtelExporterBuildError(err) - | Error::Json(err) => err.status, + | Error::Json(err) + | Error::ReadStateManager(err) => err.status, } } } diff --git a/src/moonlink/src/union_read/read_state_manager.rs b/src/moonlink/src/union_read/read_state_manager.rs index c1f8daacd..e4bec5162 100644 --- a/src/moonlink/src/union_read/read_state_manager.rs +++ b/src/moonlink/src/union_read/read_state_manager.rs @@ -1,6 +1,7 @@ use crate::error::Result; use crate::storage::MooncakeTable; use crate::storage::SnapshotTableState; +use crate::Error; use crate::ReadState; use crate::ReadStateFilepathRemap; use more_asserts as ma; @@ -16,6 +17,8 @@ const NO_CACHE_LSN: u64 = u64::MAX; const NO_SNAPSHOT_LSN: u64 = u64::MAX; /// Commit LSN, which indicates there's no commit. const NO_COMMIT_LSN: u64 = 0; +/// Max read snapshot retries +const MAX_READ_SNAPSHOT_RETRIES: u8 = 5; pub struct ReadStateManager { last_read_lsn: AtomicU64, @@ -103,6 +106,7 @@ impl ReadStateManager { let mut table_snapshot_rx = self.table_snapshot_watch_receiver.clone(); let mut replication_lsn_rx = self.replication_lsn_rx.clone(); let last_commit_lsn = self.last_commit_lsn_rx.clone(); + let mut retries_number: u8 = 0; loop { let current_snapshot_lsn = *table_snapshot_rx.borrow(); @@ -114,6 +118,7 @@ impl ReadStateManager { current_snapshot_lsn, current_replication_lsn, last_commit_lsn_val, + &mut retries_number, ) { return self .read_from_snapshot_and_update_cache( @@ -133,18 +138,50 @@ impl ReadStateManager { } } + #[inline] + fn validate_lsn_ordering( + snapshot_lsn: u64, + commit_lsn: u64, + replication_lsn: u64, + ) -> Result<()> { + // validate right ordering lsn: snapshot_lsn<=commit_lsn<=replication_lsn + if snapshot_lsn != NO_SNAPSHOT_LSN + && commit_lsn != NO_COMMIT_LSN + && snapshot_lsn > commit_lsn + { + return Err(Error::read_validation_error(format!( + "snapshot_lsn > commit_lsn: {} > {}", + snapshot_lsn, commit_lsn + ))); + } + if commit_lsn > replication_lsn { + return Err(Error::read_validation_error(format!( + "commit_lsn > replication_lsn: {} > {}", + commit_lsn, replication_lsn + ))); + } + Ok(()) + } + fn can_satisfy_read_from_snapshot( &self, requested_lsn: Option, snapshot_lsn: u64, replication_lsn: u64, commit_lsn: u64, + retries_number: &mut u8, ) -> bool { // Sanity check on read side: iceberg snapshot LSN <= mooncake snapshot LSN <= commit LSN <= replication LSN - if snapshot_lsn != NO_SNAPSHOT_LSN && commit_lsn != NO_COMMIT_LSN { - ma::assert_le!(snapshot_lsn, commit_lsn); + match Self::validate_lsn_ordering(snapshot_lsn, commit_lsn, replication_lsn) { + Ok(_) => {} // continue + Err(err) => { + *retries_number += 1; + if *retries_number >= MAX_READ_SNAPSHOT_RETRIES { + panic!("Error after {} retries: {}", retries_number, err); + } + return false; + } } - ma::assert_le!(commit_lsn, replication_lsn); // Check snapshot readability. let is_snapshot_clean = Self::snapshot_is_clean(snapshot_lsn, commit_lsn); From 321496ef1d8cc6a51db0d71821fedb58e07969db Mon Sep 17 00:00:00 2001 From: thuong Date: Sun, 26 Oct 2025 05:43:01 +0000 Subject: [PATCH 4/5] fix: remove redundant import --- src/moonlink/src/error.rs | 2 +- src/moonlink/src/union_read/read_state_manager.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/moonlink/src/error.rs b/src/moonlink/src/error.rs index 7f70b8d3d..98f64777e 100644 --- a/src/moonlink/src/error.rs +++ b/src/moonlink/src/error.rs @@ -223,7 +223,7 @@ impl Error { | Error::JoinError(err) | Error::PbToMoonlinkRowError(err) | Error::OtelExporterBuildError(err) - | Error::Json(err) + | Error::Json(err) | Error::ReadStateManager(err) => err.status, } } diff --git a/src/moonlink/src/union_read/read_state_manager.rs b/src/moonlink/src/union_read/read_state_manager.rs index e4bec5162..da8c4e814 100644 --- a/src/moonlink/src/union_read/read_state_manager.rs +++ b/src/moonlink/src/union_read/read_state_manager.rs @@ -4,7 +4,6 @@ use crate::storage::SnapshotTableState; use crate::Error; use crate::ReadState; use crate::ReadStateFilepathRemap; -use more_asserts as ma; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use tokio::sync::{watch, RwLock}; From 1811b42aa709cc6bfa983b020cb166a11b7dea14 Mon Sep 17 00:00:00 2001 From: thuong Date: Sun, 26 Oct 2025 09:05:13 +0000 Subject: [PATCH 5/5] fix: update the row line for loc in test_error_propagation_with_source --- src/moonlink/src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/moonlink/src/error.rs b/src/moonlink/src/error.rs index 98f64777e..0ebf1ab5b 100644 --- a/src/moonlink/src/error.rs +++ b/src/moonlink/src/error.rs @@ -254,7 +254,7 @@ mod tests { if let Error::Io(ref inner) = io_error { let loc = inner.location.as_ref().unwrap(); assert!(loc.contains("src/moonlink/src/error.rs")); - assert!(loc.contains("230")); + assert!(loc.contains("238")); assert!(loc.contains("9")); } }