diff --git a/README.md b/README.md index a8d9bf96..8f709207 100644 --- a/README.md +++ b/README.md @@ -1246,8 +1246,8 @@ when an activity class is referenced in a workflow before it has been explicitly > Temporalio::Workflow::Unsafe.illegal_call_tracing_disabled block. This comes from `bootsnap` via `zeitwork` because it is lazily loading a class/module at workflow runtime. It is not -good to lazily load code durnig a workflow run because it can be side effecting. Workflows and the classes they -reference should not be eagerly loaded. +good to lazily load code during a workflow run because it can be side effecting. Workflows and the classes they +reference should be eagerly loaded. To resolve this, either always eagerly load (e.g. `config.eager_load = true`) or explicitly `require` what is used by a workflow at the top of the file. diff --git a/temporalio/Cargo.lock b/temporalio/Cargo.lock index fe87d94b..436db1af 100644 --- a/temporalio/Cargo.lock +++ b/temporalio/Cargo.lock @@ -312,9 +312,9 @@ dependencies = [ [[package]] name = "bitflags" -version = "2.9.4" +version = "2.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" +checksum = "6a65b545ab31d687cff52899d4890855fec459eb6afe0da6417b8a18da87aa29" [[package]] name = "bumpalo" @@ -351,11 +351,10 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.2.37" +version = "1.2.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65193589c6404eb80b450d618eaf9a2cafaaafd57ecce47370519ef674a7bd44" +checksum = "3ee0f8803222ba5a7e2777dd72ca451868909b1ac410621b676adf07280e9b5f" dependencies = [ - "find-msvc-tools", "jobserver", "libc", "shlex", @@ -384,9 +383,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] name = "chrono" -version = "0.4.42" +version = "0.4.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145052bdd345b87320e369255277e3fb5152762ad123a901ef5c262dd38fe8d2" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" dependencies = [ "num-traits", "serde", @@ -432,9 +431,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.47" +version = "4.5.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931" +checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318" dependencies = [ "clap_builder", "clap_derive", @@ -442,9 +441,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.47" +version = "4.5.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6" +checksum = "b3e7f4214277f3c7aa526a59dd3fbe306a370daee1f8b7b8c987069cd8e888a8" dependencies = [ "anstream", "anstyle", @@ -454,9 +453,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.47" +version = "4.5.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbfd7eae0b0f1a6e63d4b13c9c478de77c2eb546fba158ad50b4203dc24b9f9c" +checksum = "14cb31bb0a7d536caef2639baa7fad459e15c3144efefa6dbd1c84562c4739f6" dependencies = [ "heck", "proc-macro2", @@ -753,7 +752,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.61.0", + "windows-sys 0.60.2", ] [[package]] @@ -787,18 +786,18 @@ checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" [[package]] name = "enum-iterator" -version = "2.3.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4549325971814bda7a44061bf3fe7e487d447cba01e4220a4b454d630d7a016" +checksum = "c280b9e6b3ae19e152d8e31cf47f18389781e119d4013a2a2bb0180e5facc635" dependencies = [ "enum-iterator-derive", ] [[package]] name = "enum-iterator-derive" -version = "1.5.0" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "685adfa4d6f3d765a26bc5dbc936577de9abf756c1feeb3089b01dd395034842" +checksum = "a1ab991c1362ac86c61ab6f556cff143daa22e5a15e4e189df818b2fd19fe65b" dependencies = [ "proc-macro2", "quote", @@ -825,23 +824,22 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "erased-serde" -version = "0.4.8" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "259d404d09818dec19332e31d94558aeb442fea04c817006456c24b5460bbd4b" +checksum = "e004d887f51fcb9fef17317a2f3525c887d8aa3f4f50fed920816a688284a5b7" dependencies = [ "serde", - "serde_core", "typeid", ] [[package]] name = "errno" -version = "0.3.14" +version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.61.0", + "windows-sys 0.60.2", ] [[package]] @@ -862,12 +860,6 @@ dependencies = [ "windows-sys 0.60.2", ] -[[package]] -name = "find-msvc-tools" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fd99930f64d146689264c637b5af2f0233a933bef0d8570e2526bf9e083192d" - [[package]] name = "fixedbitset" version = "0.5.7" @@ -899,9 +891,9 @@ checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" [[package]] name = "form_urlencoded" -version = "1.2.2" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ "percent-encoding", ] @@ -1051,7 +1043,7 @@ dependencies = [ "js-sys", "libc", "r-efi", - "wasi 0.14.7+wasi-0.2.4", + "wasi 0.14.2+wasi-0.2.4", "wasm-bindgen", ] @@ -1102,7 +1094,7 @@ dependencies = [ "futures-core", "futures-sink", "http", - "indexmap 2.11.3", + "indexmap 2.10.0", "slab", "tokio", "tokio-util", @@ -1209,9 +1201,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "humantime" -version = "2.3.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424" +checksum = "9b112acc8b3adf4b107a8ec20977da0273a8c386765a3ec0229bd500a1443f9f" [[package]] name = "hyper" @@ -1268,9 +1260,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.17" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" +checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e" dependencies = [ "base64 0.22.1", "bytes", @@ -1384,9 +1376,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "1.1.0" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" dependencies = [ "idna_adapter", "smallvec", @@ -1415,9 +1407,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.11.3" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92119844f513ffa41556430369ab02c295a3578af21cf945caa3e9e0c2481ac3" +checksum = "fe4cd85333e22411419a0bcae1297d25e58c9443848b11dc6a86fefe8c78a661" dependencies = [ "equivalent", "hashbrown 0.15.5", @@ -1443,9 +1435,9 @@ dependencies = [ [[package]] name = "io-uring" -version = "0.7.10" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" +checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" dependencies = [ "bitflags", "cfg-if", @@ -1509,9 +1501,9 @@ checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" [[package]] name = "jobserver" -version = "0.1.34" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9afb3de4395d6b3e67a780b6de64b51c978ecf11cb9a462c66be7d4ca9039d33" +checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a" dependencies = [ "getrandom 0.3.3", "libc", @@ -1519,9 +1511,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.80" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "852f13bec5eba4ba9afbeb93fd7c13fe56147f055939ae21c43a29a0ecb2702e" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" dependencies = [ "once_cell", "wasm-bindgen", @@ -1563,9 +1555,9 @@ dependencies = [ [[package]] name = "libredox" -version = "0.1.10" +version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" +checksum = "391290121bad3d37fbddad76d8f5d1c1c314cfc646d143d7e07a3086ddff0ce3" dependencies = [ "bitflags", "libc", @@ -1583,9 +1575,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.11.0" +version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" +checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" [[package]] name = "litemap" @@ -1605,15 +1597,15 @@ dependencies = [ [[package]] name = "log" -version = "0.4.28" +version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" [[package]] name = "lru" -version = "0.16.1" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfe949189f46fabb938b3a9a0be30fdd93fd8a09260da863399a8cf3db756ec8" +checksum = "86ea4e65087ff52f3862caff188d489f1fab49a0cb09e01b2e3f1a617b10aaed" dependencies = [ "hashbrown 0.15.5", ] @@ -1940,9 +1932,9 @@ dependencies = [ [[package]] name = "percent-encoding" -version = "2.3.2" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "petgraph" @@ -1951,7 +1943,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" dependencies = [ "fixedbitset", - "indexmap 2.11.3", + "indexmap 2.10.0", ] [[package]] @@ -2046,9 +2038,9 @@ dependencies = [ [[package]] name = "potential_utf" -version = "0.1.3" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84df19adbe5b5a0782edcab45899906947ab039ccf4573713735ee7de1e6b08a" +checksum = "e5a7c30837279ca13e7c867e9e40053bc68740f988cb07f7ca6df43cc734b585" dependencies = [ "zerovec", ] @@ -2100,9 +2092,9 @@ dependencies = [ [[package]] name = "proc-macro-crate" -version = "3.4.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "219cb19e96be00ab2e37d6e299658a0cfa83e52429179969b0f0121b4ac46983" +checksum = "edce586971a4dfaa28950c6f18ed55e0406c1ab88bbce2c6f6293a7aaba73d35" dependencies = [ "toml_edit", ] @@ -2266,9 +2258,9 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.9" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8" dependencies = [ "bytes", "cfg_aliases", @@ -2277,7 +2269,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.1.1", "rustls", - "socket2 0.6.0", + "socket2 0.5.10", "thiserror 2.0.16", "tokio", "tracing", @@ -2286,9 +2278,9 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.13" +version = "0.11.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" +checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e" dependencies = [ "bytes", "getrandom 0.3.3", @@ -2307,16 +2299,16 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.14" +version = "0.5.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +checksum = "fcebb1209ee276352ef14ff8732e24cc2b02bbac986cd74a4c81bcb2f9881970" dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.0", + "socket2 0.5.10", "tracing", - "windows-sys 0.60.2", + "windows-sys 0.59.0", ] [[package]] @@ -2395,9 +2387,9 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "11.6.0" +version = "11.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "498cd0dc59d73224351ee52a95fee0f1a617a2eae0e7d9d720cc622c73a54186" +checksum = "c6df7ab838ed27997ba19a4664507e6f82b41fe6e20be42929332156e5e85146" dependencies = [ "bitflags", ] @@ -2474,9 +2466,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.2" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23d7fd106d8c02486a8d64e778353d1cffe08ce79ac2e82f540c86d0facf6912" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", @@ -2486,9 +2478,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.10" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b9458fa0bfeeac22b5ca447c63aaf45f28439a709ccd244698632f9aa6394d6" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", @@ -2497,9 +2489,9 @@ dependencies = [ [[package]] name = "regex-syntax" -version = "0.8.6" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "relative-path" @@ -2656,15 +2648,15 @@ version = "0.1.0" [[package]] name = "rustix" -version = "1.1.2" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" +checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" dependencies = [ "bitflags", "errno", "libc", "linux-raw-sys", - "windows-sys 0.61.0", + "windows-sys 0.60.2", ] [[package]] @@ -2706,9 +2698,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.6" +version = "0.103.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8572f3c2cb9934231157b45499fc41e1f58c589fdfb81a844ba873265e80f8eb" +checksum = "0a17884ae0c1b773f1ccd2bd4a8c72f16da897310a98b0e84bf349ad5ead92fc" dependencies = [ "ring", "rustls-pki-types", @@ -2738,11 +2730,11 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.28" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +checksum = "1f29ebaa345f945cec9fbbc532eb307f0fdad8161f281b6369539c8d84876b3d" dependencies = [ - "windows-sys 0.61.0", + "windows-sys 0.59.0", ] [[package]] @@ -2753,9 +2745,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "security-framework" -version = "3.4.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60b369d18893388b345804dc0007963c99b7d665ae71d275812d828c6f089640" +checksum = "80fb1d92c5028aa318b4b8bd7302a5bfcf48be96a37fc6fc790f806b0004ee0c" dependencies = [ "bitflags", "core-foundation", @@ -2766,9 +2758,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.15.0" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" dependencies = [ "core-foundation-sys", "libc", @@ -2776,9 +2768,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.27" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" [[package]] name = "seq-macro" @@ -2788,28 +2780,18 @@ checksum = "1bc711410fbe7399f390ca1c3b60ad0f53f80e95c5eb935e52268a0e2cd49acc" [[package]] name = "serde" -version = "1.0.225" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd6c24dee235d0da097043389623fb913daddf92c76e9f5a1db88607a0bcbd1d" -dependencies = [ - "serde_core", - "serde_derive", -] - -[[package]] -name = "serde_core" -version = "1.0.225" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "659356f9a0cb1e529b24c01e43ad2bdf520ec4ceaf83047b83ddcc2251f96383" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.225" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ea936adf78b1f766949a4977b91d2f5595825bd6ec079aa9543ad2685fc4516" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -2818,24 +2800,23 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.145" +version = "1.0.143" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" +checksum = "d401abef1d108fbd9cbaebc3e46611f4b1021f714a0597a71f41ee463f5f4a5a" dependencies = [ "itoa", "memchr", "ryu", "serde", - "serde_core", ] [[package]] name = "serde_spanned" -version = "1.0.1" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2789234a13a53fc4be1b51ea1bab45a3c338bdb884862a257d10e5a74ae009e6" +checksum = "40734c41988f7306bb04f0ecf60ec0f3f1caa34290e4e8ea471dcd3346483b83" dependencies = [ - "serde_core", + "serde", ] [[package]] @@ -3024,15 +3005,15 @@ checksum = "1ac9aa371f599d22256307c24a9d748c041e548cbf599f35d890f9d365361790" [[package]] name = "tempfile" -version = "3.22.0" +version = "3.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84fa4d11fadde498443cca10fd3ac23c951f0dc59e080e9f4b93d4df4e4eea53" +checksum = "15b61f8f20e3a6f7e0649d825294eaf317edce30f82cf6026e7e4cb9222a7d1e" dependencies = [ "fastrand", "getrandom 0.3.3", "once_cell", "rustix", - "windows-sys 0.61.0", + "windows-sys 0.60.2", ] [[package]] @@ -3205,7 +3186,9 @@ dependencies = [ name = "temporalio_bridge" version = "0.1.0" dependencies = [ + "async-trait", "futures", + "log", "magnus", "parking_lot", "prost", @@ -3355,9 +3338,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.26.3" +version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05f63835928ca123f1bef57abbcd23bb2ba0ac9ae1235f1e65bda0d06e7786bd" +checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" dependencies = [ "rustls", "tokio", @@ -3389,14 +3372,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.9.6" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae2a4cf385da23d1d53bc15cdfa5c2109e93d8d362393c801e87da2f72f0e201" +checksum = "75129e1dc5000bfbaa9fee9d1b21f974f9fbad9daec557a521ee6e080825f6e8" dependencies = [ - "indexmap 2.11.3", - "serde_core", + "indexmap 2.10.0", + "serde", "serde_spanned", - "toml_datetime", + "toml_datetime 0.7.0", "toml_parser", "toml_writer", "winnow", @@ -3404,22 +3387,27 @@ dependencies = [ [[package]] name = "toml_datetime" -version = "0.7.1" +version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a197c0ec7d131bfc6f7e82c8442ba1595aeab35da7adbf05b6b73cd06a16b6be" +checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c" + +[[package]] +name = "toml_datetime" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bade1c3e902f58d73d3f294cd7f20391c1cb2fbcb643b73566bc773971df91e3" dependencies = [ - "serde_core", + "serde", ] [[package]] name = "toml_edit" -version = "0.23.5" +version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2ad0b7ae9cfeef5605163839cb9221f453399f15cfb5c10be9885fcf56611f9" +checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap 2.11.3", - "toml_datetime", - "toml_parser", + "indexmap 2.10.0", + "toml_datetime 0.6.11", "winnow", ] @@ -3541,7 +3529,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", - "indexmap 2.11.3", + "indexmap 2.10.0", "pin-project-lite", "slab", "sync_wrapper", @@ -3639,9 +3627,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "trybuild" -version = "1.0.111" +version = "1.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ded9fdb81f30a5708920310bfcd9ea7482ff9cba5f54601f7a19a877d5c2392" +checksum = "32e257d7246e7a9fd015fb0b28b330a8d4142151a33f03e6a497754f4b1f6a8e" dependencies = [ "dissimilar", "glob", @@ -3685,9 +3673,9 @@ dependencies = [ [[package]] name = "unicode-ident" -version = "1.0.19" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f63a545481291138910575129486daeaf8ac54aee4387fe7906919f7830c7d9d" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" [[package]] name = "unicode-xid" @@ -3703,14 +3691,13 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.7" +version = "2.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" +checksum = "32f8b686cadd1473f4bd0117a5d28d36b1ade384ea9b5069a1c40aefed7fda60" dependencies = [ "form_urlencoded", "idna", "percent-encoding", - "serde", ] [[package]] @@ -3727,9 +3714,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.1" +version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +checksum = "f33196643e165781c20a5ead5582283a7dacbb87855d867fbc2df3f81eddc1be" dependencies = [ "getrandom 0.3.3", "js-sys", @@ -3775,40 +3762,30 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasi" -version = "0.14.7+wasi-0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c" -dependencies = [ - "wasip2", -] - -[[package]] -name = "wasip2" -version = "1.0.1+wasi-0.2.4" +version = "0.14.2+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" dependencies = [ - "wit-bindgen", + "wit-bindgen-rt", ] [[package]] name = "wasm-bindgen" -version = "0.2.103" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab10a69fbd0a177f5f649ad4d8d3305499c42bab9aef2f7ff592d0ec8f833819" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" dependencies = [ "cfg-if", "once_cell", "rustversion", "wasm-bindgen-macro", - "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-backend" -version = "0.2.103" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0bb702423545a6007bbc368fde243ba47ca275e549c8a28617f56f6ba53b1d1c" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" dependencies = [ "bumpalo", "log", @@ -3820,9 +3797,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.53" +version = "0.4.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0b221ff421256839509adbb55998214a70d829d3a28c69b4a6672e9d2a42f67" +checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" dependencies = [ "cfg-if", "js-sys", @@ -3833,9 +3810,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.103" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc65f4f411d91494355917b605e1480033152658d71f722a90647f56a70c88a0" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3843,9 +3820,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.103" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffc003a991398a8ee604a401e194b6b3a39677b3173d6e74495eb51b82e99a32" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" dependencies = [ "proc-macro2", "quote", @@ -3856,9 +3833,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.103" +version = "0.2.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "293c37f4efa430ca14db3721dfbe48d8c33308096bd44d80ebaa775ab71ba1cf" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" dependencies = [ "unicode-ident", ] @@ -3878,9 +3855,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.80" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbe734895e869dc429d78c4b433f8d17d95f8d05317440b4fad5ab2d33e596dc" +checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" dependencies = [ "js-sys", "wasm-bindgen", @@ -3914,11 +3891,11 @@ checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" [[package]] name = "winapi-util" -version = "0.1.11" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +checksum = "0978bf7171b3d90bac376700cb56d606feb40f251a475a5d6634613564460b22" dependencies = [ - "windows-sys 0.61.0", + "windows-sys 0.60.2", ] [[package]] @@ -3936,7 +3913,7 @@ dependencies = [ "windows-collections", "windows-core", "windows-future", - "windows-link 0.1.3", + "windows-link", "windows-numerics", ] @@ -3957,7 +3934,7 @@ checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" dependencies = [ "windows-implement", "windows-interface", - "windows-link 0.1.3", + "windows-link", "windows-result", "windows-strings", ] @@ -3969,7 +3946,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" dependencies = [ "windows-core", - "windows-link 0.1.3", + "windows-link", "windows-threading", ] @@ -4001,12 +3978,6 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" -[[package]] -name = "windows-link" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45e46c0661abb7180e7b9c281db115305d49ca1709ab8242adf09666d2173c65" - [[package]] name = "windows-numerics" version = "0.2.0" @@ -4014,7 +3985,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" dependencies = [ "windows-core", - "windows-link 0.1.3", + "windows-link", ] [[package]] @@ -4023,7 +3994,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link 0.1.3", + "windows-link", ] [[package]] @@ -4032,7 +4003,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link 0.1.3", + "windows-link", ] [[package]] @@ -4062,15 +4033,6 @@ dependencies = [ "windows-targets 0.53.3", ] -[[package]] -name = "windows-sys" -version = "0.61.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e201184e40b2ede64bc2ea34968b28e33622acdbbf37104f0e4a33f7abe657aa" -dependencies = [ - "windows-link 0.2.0", -] - [[package]] name = "windows-targets" version = "0.52.6" @@ -4093,7 +4055,7 @@ version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ - "windows-link 0.1.3", + "windows-link", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", @@ -4110,7 +4072,7 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" dependencies = [ - "windows-link 0.1.3", + "windows-link", ] [[package]] @@ -4211,18 +4173,21 @@ checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" [[package]] name = "winnow" -version = "0.7.13" +version = "0.7.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a0236b59786fed61e2a80582dd500fe61f18b5dca67a4a067d0bc9039339cf" +checksum = "f3edebf492c8125044983378ecb5766203ad3b4c2f7a922bd7dd207f6d443e95" dependencies = [ "memchr", ] [[package]] -name = "wit-bindgen" -version = "0.46.0" +name = "wit-bindgen-rt" +version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags", +] [[package]] name = "writeable" @@ -4266,18 +4231,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.27" +version = "0.8.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0894878a5fa3edfd6da3f88c4805f4c8558e2b996227a3d864f47fe11e38282c" +checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.27" +version = "0.8.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "88d2b8d9c68ad2b9e4340d7832716a4d21a22a1154777ad56ea55c51a9cf3831" +checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" dependencies = [ "proc-macro2", "quote", @@ -4354,7 +4319,7 @@ dependencies = [ "bzip2", "crc32fast", "flate2", - "indexmap 2.11.3", + "indexmap 2.10.0", "memchr", "zopfli", "zstd", @@ -4398,9 +4363,9 @@ dependencies = [ [[package]] name = "zstd-sys" -version = "2.0.16+zstd.1.5.7" +version = "2.0.15+zstd.1.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e19ebc2adc8f83e43039e79776e3fda8ca919132d68a1fed6a5faca2683748" +checksum = "eb81183ddd97d0c74cedf1d50d85c8d08c1b8b68ee863bdee9e706eedba1a237" dependencies = [ "cc", "pkg-config", diff --git a/temporalio/ext/Cargo.toml b/temporalio/ext/Cargo.toml index 73eb3475..285bc4e8 100644 --- a/temporalio/ext/Cargo.toml +++ b/temporalio/ext/Cargo.toml @@ -10,7 +10,9 @@ publish = false crate-type = ["cdylib"] [dependencies] +async-trait = "0.1" futures = "0.3" +log = "0.4" magnus = "0.7" parking_lot = "0.12" prost = "0.13" diff --git a/temporalio/ext/sdk-core b/temporalio/ext/sdk-core index b3acbe2e..d21a257e 160000 --- a/temporalio/ext/sdk-core +++ b/temporalio/ext/sdk-core @@ -1 +1 @@ -Subproject commit b3acbe2e83c2a2b798aeb3d0261d75a721988667 +Subproject commit d21a257e7bdd1e039e386f561a3c87b4efc317ce diff --git a/temporalio/ext/src/lib.rs b/temporalio/ext/src/lib.rs index 9979acc1..623c7c7f 100644 --- a/temporalio/ext/src/lib.rs +++ b/temporalio/ext/src/lib.rs @@ -46,6 +46,14 @@ macro_rules! id { }}; } +#[macro_export] +macro_rules! lazy_id { + ($str:expr) => {{ + static VAL: magnus::value::LazyId = magnus::value::LazyId::new($str); + &VAL + }}; +} + #[magnus::init] fn init(ruby: &Ruby) -> Result<(), Error> { Lazy::force(&ROOT_ERR, ruby); diff --git a/temporalio/ext/src/runtime.rs b/temporalio/ext/src/runtime.rs index a2f01cb4..88efaee1 100644 --- a/temporalio/ext/src/runtime.rs +++ b/temporalio/ext/src/runtime.rs @@ -294,6 +294,29 @@ impl RuntimeHandle { }); } + /// Same as spawn, but does not spawn in a background Tokio task and does not provide a + /// non-GVL awaitable. This simply submits the callback to the Ruby thread inline and + /// returns. + pub(crate) fn spawn_sync_inline(&self, with_gvl: F) + where + F: FnOnce(Ruby) -> Result<(), Error> + Send + 'static, + { + // Ignore fail to send in rare case that the runtime/handle is + // dropped before this Tokio future runs + let _ = self + .async_command_tx + .clone() + .send(AsyncCommand::RunCallback(Box::new( + move || match Ruby::get() { + Ok(ruby) => with_gvl(ruby), + Err(err) => { + log_error!("Unable to get Ruby instance in async callback: {}", err); + Ok(()) + } + }, + ))); + } + pub(crate) fn fork_check(&self, action: &'static str) -> Result<(), Error> { if self.pid != std::process::id() { Err(error!( diff --git a/temporalio/ext/src/worker.rs b/temporalio/ext/src/worker.rs index 2de1b489..3fba95eb 100644 --- a/temporalio/ext/src/worker.rs +++ b/temporalio/ext/src/worker.rs @@ -1,6 +1,7 @@ use std::{ cell::RefCell, collections::{HashMap, HashSet}, + marker::PhantomData, sync::Arc, time::Duration, }; @@ -8,15 +9,20 @@ use std::{ use crate::{ ROOT_MOD, client::Client, - enter_sync, error, id, new_error, + enter_sync, error, id, lazy_id, new_error, runtime::{AsyncCommand, Runtime, RuntimeHandle}, - util::{AsyncCallback, Struct}, + util::{AsyncCallback, SendSyncBoxValue, Struct}, }; use futures::{StreamExt, stream::BoxStream}; use futures::{future, stream}; use magnus::{ - DataTypeFunctions, Error, IntoValue, RArray, RString, RTypedData, Ruby, TypedData, Value, - class, function, method, prelude::*, typed_data, + DataTypeFunctions, Error, Exception, IntoValue, RArray, RClass, RModule, RString, RTypedData, + Ruby, TypedData, Value, + block::Proc, + class, function, method, + prelude::*, + typed_data, + value::{Lazy, LazyId}, }; use prost::Message; use temporal_sdk_core::{ @@ -27,14 +33,18 @@ use temporal_sdk_core::{ use temporal_sdk_core_api::{ errors::{PollError, WorkflowErrorType}, worker::{ - PollerBehavior, SlotKind, WorkerDeploymentOptions, WorkerDeploymentVersion, - WorkerVersioningStrategy, + PollerBehavior, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext, + SlotReleaseContext, SlotReservationContext, SlotSupplier, SlotSupplierPermit, + WorkerDeploymentOptions, WorkerDeploymentVersion, WorkerVersioningStrategy, }, }; use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; -use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion}; +use temporal_sdk_core_protos::coresdk::{ + ActivityHeartbeat, ActivitySlotInfo, ActivityTaskCompletion, LocalActivitySlotInfo, + NexusSlotInfo, WorkflowSlotInfo, +}; use temporal_sdk_core_protos::temporal::api::history::v1::History; -use tokio::sync::mpsc::{Sender, channel}; +use tokio::sync::mpsc::{Sender, UnboundedSender, channel, unbounded_channel}; use tokio_stream::wrappers::ReceiverStream; pub fn init(ruby: &Ruby) -> Result<(), Error> { @@ -105,7 +115,7 @@ impl Worker { let worker = temporal_sdk_core::init_worker( &client.runtime_handle.core, - build_config(options)?, + build_config(options, &client.runtime_handle)?, client.core.clone().into_inner(), ) .map_err(|err| error!("Failed creating worker: {}", err))?; @@ -419,7 +429,7 @@ impl WorkflowReplayer { let (tx, rx) = channel(1); let core_worker = temporal_sdk_core::init_replay_worker(ReplayWorkerInput::new( - build_config(options)?, + build_config(options, &runtime.handle)?, ReceiverStream::new(rx), )) .map_err(|err| error!("Failed creating worker: {}", err))?; @@ -450,7 +460,7 @@ impl WorkflowReplayer { } } -fn build_config(options: Struct) -> Result { +fn build_config(options: Struct, runtime_handle: &RuntimeHandle) -> Result { WorkerConfigBuilder::default() .namespace(options.member::(id!("namespace"))?) .task_queue(options.member::(id!("task_queue"))?) @@ -522,6 +532,7 @@ fn build_config(options: Struct) -> Result { options .child(id!("tuner"))? .ok_or_else(|| error!("Missing tuner"))?, + runtime_handle, )?)) .workflow_failure_errors( if options.member::(id!("nondeterminism_as_workflow_fail"))? { @@ -541,24 +552,39 @@ fn build_config(options: Struct) -> Result { .map_err(|err| error!("Invalid worker options: {}", err)) } -fn build_tuner(options: Struct) -> Result { +fn extract_poller_behavior(poller_behavior: Struct) -> Result { + Ok(if poller_behavior.member::(id!("initial")).is_ok() { + PollerBehavior::Autoscaling { + minimum: poller_behavior.member::(id!("minimum"))?, + maximum: poller_behavior.member::(id!("maximum"))?, + initial: poller_behavior.member::(id!("initial"))?, + } + } else { + PollerBehavior::SimpleMaximum(poller_behavior.member::(id!("simple_maximum"))?) + }) +} + +fn build_tuner(options: Struct, runtime_handle: &RuntimeHandle) -> Result { let (workflow_slot_options, resource_slot_options) = build_tuner_slot_options( options .child(id!("workflow_slot_supplier"))? .ok_or_else(|| error!("Missing workflow slot options"))?, None, + runtime_handle, )?; let (activity_slot_options, resource_slot_options) = build_tuner_slot_options( options .child(id!("activity_slot_supplier"))? .ok_or_else(|| error!("Missing activity slot options"))?, resource_slot_options, + runtime_handle, )?; let (local_activity_slot_options, resource_slot_options) = build_tuner_slot_options( options .child(id!("local_activity_slot_supplier"))? .ok_or_else(|| error!("Missing local activity slot options"))?, resource_slot_options, + runtime_handle, )?; let mut opts_build = TunerHolderOptionsBuilder::default(); @@ -575,16 +601,21 @@ fn build_tuner(options: Struct) -> Result { .map_err(|err| error!("Failed building tuner options: {}", err)) } -fn build_tuner_slot_options( +fn build_tuner_slot_options( options: Struct, prev_slots_options: Option, + runtime_handle: &RuntimeHandle, ) -> Result<(SlotSupplierOptions, Option), Error> { if let Some(slots) = options.member::>(id!("fixed_size"))? { Ok((SlotSupplierOptions::FixedSize { slots }, prev_slots_options)) } else if let Some(resource) = options.child(id!("resource_based"))? { build_tuner_resource_options(resource, prev_slots_options) + } else if let Some(custom) = options.member::>(id!("custom"))? { + build_custom_slot_supplier(custom, runtime_handle).map(|v| (v, None)) } else { - Err(error!("Slot supplier must be fixed size or resource based")) + Err(error!( + "Slot supplier must be fixed size, resource based, or custom" + )) } } @@ -615,14 +646,557 @@ fn build_tuner_resource_options( )) } -fn extract_poller_behavior(poller_behavior: Struct) -> Result { - Ok(if poller_behavior.member::(id!("initial")).is_ok() { - PollerBehavior::Autoscaling { - minimum: poller_behavior.member::(id!("minimum"))?, - maximum: poller_behavior.member::(id!("maximum"))?, - initial: poller_behavior.member::(id!("initial"))?, +fn build_custom_slot_supplier( + supplier: Value, + runtime_handle: &RuntimeHandle, +) -> Result, Error> { + Ok(SlotSupplierOptions::Custom(Arc::new(CustomSlotSupplier::< + SK, + > { + supplier: Arc::new(SendSyncBoxValue::new(supplier)), + runtime_handle: runtime_handle.clone(), + _phantom: PhantomData, + }))) +} + +struct CustomSlotSupplier { + supplier: Arc>, + runtime_handle: RuntimeHandle, + _phantom: PhantomData, +} + +struct CancelGuard { + f: Option, +} + +impl CancelGuard { + fn new(f: F) -> Self { + Self { f: Some(f) } + } + + fn defuse(mut self) { + self.f.take(); + } +} + +impl Drop for CancelGuard { + fn drop(&mut self) { + if let Some(f) = self.f.take() { + f(); + } + } +} + +#[async_trait::async_trait] +impl SlotSupplier for CustomSlotSupplier { + type SlotKind = SK; + + async fn reserve_slot(&self, ctx: &dyn SlotReservationContext) -> SlotSupplierPermit { + // Do this in an infinite loop until we get success (log and sleep on error) + let ctx = SlotReserveContextHolder::new(SK::kind(), ctx); + loop { + match self.attempt_reserve_slot(ctx.clone()).await { + Ok(permit) => { + return SlotSupplierPermit::with_user_data(Arc::new(permit)); + } + Err(err) => { + log::error!("Slot reserve failed: {err}"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await + } + } } + } + + fn try_reserve_slot(&self, ctx: &dyn SlotReservationContext) -> Option { + let res = self.call_supplier_sync( + lazy_id!("try_reserve_slot"), + SlotReserveContextHolder::new(SK::kind(), ctx), + |v| { + if v.is_nil() { + None + } else { + Some(SendSyncBoxValue::new(v)) + } + }, + ); + match res { + Ok(v) => v.map(|v| SlotSupplierPermit::with_user_data(Arc::new(v))), + Err(err) => { + log::error!("Slot try-reserve failed: {err}"); + None + } + } + } + + fn mark_slot_used(&self, ctx: &dyn SlotMarkUsedContext) { + let res = self.call_supplier_sync( + lazy_id!("mark_slot_used"), + SlotMarkUsedContextHolder::new(ctx), + |_| (), + ); + if let Err(err) = res { + log::error!("Mark slot used failed: {err}"); + } + } + + fn release_slot(&self, ctx: &dyn SlotReleaseContext) { + let res = self.call_supplier_sync( + lazy_id!("release_slot"), + SlotReleaseContextHolder::new(ctx), + |_| (), + ); + if let Err(err) = res { + log::error!("Mark slot used failed: {err}"); + } + } +} + +impl CustomSlotSupplier { + async fn attempt_reserve_slot( + &self, + ctx: SlotReserveContextHolder, + ) -> Result, String> { + // Make a call with channels getting info + let (result_tx, mut result_rx) = unbounded_channel(); + let (cancel_proc_ready_tx, mut cancel_proc_ready_rx) = unbounded_channel(); + let result_err_tx = result_tx.clone(); + self.start_supplier_call( + lazy_id!("reserve_slot"), + ctx, + move |_, val| { + let _ = result_tx.send(Ok(SendSyncBoxValue::new(val))); + }, + move |err| { + let _ = result_err_tx.send(Err(err)); + }, + Some(cancel_proc_ready_tx), + ); + + // Wait for cancel proc ready or result. Note, we may not get a cancel proc in the case + // that its tx is dropped before the result tx is recorded, so we just ignore and move on. + let (cancel_proc, result) = tokio::select! { + cancel_proc = cancel_proc_ready_rx.recv() => (cancel_proc, None), + result = result_rx.recv() => (None, Some(result.ok_or_else(|| "Unable to get result".to_string()))), + }; + // If we got a result, return it + if let Some(result) = result { + return result.flatten(); + } + + // Now that we have a cancel proc, we can setup the guard + let runtime_handle = self.runtime_handle.clone(); + let cancel_guard = cancel_proc.map(move |cancel_proc| { + CancelGuard::new(move || { + runtime_handle.spawn(async {}, move |ruby, _| { + // TODO(cretz): Should do this in a Ruby thread to not block main Ruby + // reactor loop? + let cancel_proc = cancel_proc.value(&ruby); + if let Err(err) = cancel_proc.call::<_, Value>(()) { + log::error!("Failed canceling: {err}"); + } + Ok(()) + }); + }) + }); + + // Wait for result + let result: Result, String> = result_rx + .recv() + .await + .ok_or_else(|| "Unable to get result".to_string()) + .flatten(); + + // Defuse guard (we don't want it to run cancel proc), and return + if let Some(cancel_guard) = cancel_guard { + cancel_guard.defuse(); + } + result + } + + fn call_supplier_sync( + &self, + method: &'static LazyId, + ctx: impl TryIntoValue + Send + 'static, + map_result: impl Fn(Value) -> R + Send + 'static, + ) -> Result { + // Make call + let (result_tx, result_rx) = std::sync::mpsc::channel(); + let result_err_tx = result_tx.clone(); + self.start_supplier_call( + method, + ctx, + move |_, val| { + let _ = result_tx.send(Ok(map_result(val))); + }, + move |err| { + let _ = result_err_tx.send(Err(err)); + }, + None, + ); + + // Wait for result. We recognize this blocks, but we expect users not to block. We cannot + // use Tokio's blocking_recv or block_on because we're already in a Tokio runtime and so + // they panic. We also can't use tokio::task::block_in_place because this is sometimes + // started on a non-Tokio-started thread from a workflow context, and block_in_place fails + // even in legitimate Tokio contexts if the thread wasn't started by Tokio. So, we just wait + // and block this thread, but we have told users they should not block on these sync calls. + result_rx + .recv() + .map_err(|err| format!("Call canceled: {err}")) + .flatten() + } + + fn start_supplier_call( + &self, + method: &'static LazyId, + ctx: impl TryIntoValue + Send + 'static, + on_success: impl Fn(&Ruby, Value) + Send + 'static, + on_error: impl Fn(String) + Clone + Send + 'static, // May be called multiple times + cancel_proc_ready_tx: Option>>, + ) { + // Run in Ruby. We have to use spawn_sync_inline instead of spawn here because spawn starts + // a new Tokio task and we cannot always start a new Tokio task because we're not always in + // a multi-threaded situation where it can be waited on. + let supplier = self.supplier.clone(); + self.runtime_handle.spawn_sync_inline(move |ruby| { + if let Err(err) = start_supplier_call_in_ruby( + ruby, + supplier, + method, + ctx, + on_success, + on_error.clone(), + cancel_proc_ready_tx, + ) { + // May have already sent a result here, but it's harmless to try + on_error(format!("Raised exception: {err}")); + } + Ok(()) + }); + } +} + +fn start_supplier_call_in_ruby( + ruby: Ruby, + supplier: Arc>, + method: &'static LazyId, + ctx: impl TryIntoValue, + on_success: impl Fn(&Ruby, Value) + Send + 'static, + on_error: impl Fn(String) + Send + 'static, + cancel_proc_ready_tx: Option>>, +) -> Result<(), Error> { + // Context + let method = LazyId::get_inner_with(method, &ruby); + let ctx = ctx.try_into_value(&ruby)?; + + // Put functions in options so we can take it just once in closure + let mut on_success = Some(on_success); + let mut on_error = Some(on_error); + + // Create proc for callback + let proc = ruby.proc_from_fn(move |ruby, args, _block| { + let result = args + .first() + .copied() + .unwrap_or_else(|| ruby.qnil().as_value()); + // If it's an exception, that's an error, otherwise it's a success + if let Some(err) = Exception::from_value(result) { + if let Some(on_error) = on_error.take() { + on_error(format!("Raised exception: {err}")); + } + } else if let Some(on_success) = on_success.take() { + on_success(ruby, result); + } else { + log::error!("Custom slot supplier callback unexpectedly invoked multiple times"); + } + }); + + // Build arg set. If there is a cancel_proc_ready, we create a cancellation and provide it to + // the sender. + let mut maybe_cancellation = None; + if let Some(cancel_proc_ready_tx) = cancel_proc_ready_tx { + // Create cancellation and provide cancellation as second arg and send back cancel proc + let cancellation = ruby + .get_inner(&CANCELLATION_CLASS) + .funcall(id!("new"), ())?; + let cancel_proc = RArray::to_ary(cancellation)?.entry::(1)?; + let _ = cancel_proc_ready_tx.send(SendSyncBoxValue::new( + Proc::from_value(cancel_proc).expect("Expecting proc"), + )); + maybe_cancellation = Some(cancellation) + }; + + // Call custom supplier with the block, it can be a two-arg form with context or one-arg w/out + let supplier = supplier.value(&ruby); + if let Some(cancellation) = maybe_cancellation { + supplier.funcall_with_block::<_, _, Value>(method, (ctx, cancellation), proc)?; } else { - PollerBehavior::SimpleMaximum(poller_behavior.member::(id!("simple_maximum"))?) - }) + supplier.funcall_with_block::<_, _, Value>(method, (ctx,), proc)?; + } + Ok(()) +} + +pub static CANCELLATION_CLASS: Lazy = Lazy::new(|ruby| { + ruby.class_object() + .const_get::<_, RModule>("Temporalio") + .unwrap() + .const_get::<_, RClass>("Cancellation") + .unwrap() +}); + +pub static CUSTOM_CLASS: Lazy = Lazy::new(|ruby| { + ruby.class_object() + .const_get::<_, RModule>("Temporalio") + .unwrap() + .const_get::<_, RClass>("Worker") + .unwrap() + .const_get::<_, RClass>("Tuner") + .unwrap() + .const_get::<_, RClass>("SlotSupplier") + .unwrap() + .const_get::<_, RClass>("Custom") + .unwrap() +}); + +pub static RESERVE_CONTEXT_CLASS: Lazy = Lazy::new(|ruby| { + ruby.get_inner(&CUSTOM_CLASS) + .const_get::<_, RClass>("ReserveContext") + .unwrap() +}); + +pub static MARK_USED_CONTEXT_CLASS: Lazy = Lazy::new(|ruby| { + ruby.get_inner(&CUSTOM_CLASS) + .const_get::<_, RClass>("MarkUsedContext") + .unwrap() +}); + +pub static RELEASE_CONTEXT_CLASS: Lazy = Lazy::new(|ruby| { + ruby.get_inner(&CUSTOM_CLASS) + .const_get::<_, RClass>("ReleaseContext") + .unwrap() +}); + +pub static SLOT_INFO_MODULE: Lazy = Lazy::new(|ruby| { + ruby.get_inner(&CUSTOM_CLASS) + .const_get::<_, RModule>("SlotInfo") + .unwrap() +}); + +pub static SLOT_INFO_WORKFLOW_CLASS: Lazy = Lazy::new(|ruby| { + ruby.get_inner(&SLOT_INFO_MODULE) + .const_get::<_, RClass>("Workflow") + .unwrap() +}); + +pub static SLOT_INFO_ACTIVITY_CLASS: Lazy = Lazy::new(|ruby| { + ruby.get_inner(&SLOT_INFO_MODULE) + .const_get::<_, RClass>("Activity") + .unwrap() +}); + +pub static SLOT_INFO_LOCAL_ACTIVITY_CLASS: Lazy = Lazy::new(|ruby| { + ruby.get_inner(&SLOT_INFO_MODULE) + .const_get::<_, RClass>("LocalActivity") + .unwrap() +}); + +pub static SLOT_INFO_NEXUS_CLASS: Lazy = Lazy::new(|ruby| { + ruby.get_inner(&SLOT_INFO_MODULE) + .const_get::<_, RClass>("Nexus") + .unwrap() +}); + +trait TryIntoValue { + fn try_into_value(self, ruby: &Ruby) -> Result; +} + +#[derive(Clone)] +struct SlotReserveContextHolder { + slot_kind: SlotKindType, + task_queue: String, + worker_identity: String, + worker_deployment_name: String, + worker_build_id: String, + is_sticky: bool, +} + +impl SlotReserveContextHolder { + fn new(slot_kind: SlotKindType, ctx: &dyn SlotReservationContext) -> Self { + Self { + slot_kind, + task_queue: ctx.task_queue().to_string(), + worker_identity: ctx.worker_identity().to_string(), + worker_deployment_name: ctx + .worker_deployment_version() + .clone() + .map(|v| v.deployment_name) + .unwrap_or_default(), + worker_build_id: ctx + .worker_deployment_version() + .clone() + .map(|v| v.build_id) + .unwrap_or_default(), + is_sticky: ctx.is_sticky(), + } + } +} + +impl TryIntoValue for SlotReserveContextHolder { + fn try_into_value(self, ruby: &Ruby) -> Result { + ruby.get_inner(&RESERVE_CONTEXT_CLASS).funcall( + id!("new"), + ( + // Slot type + match self.slot_kind { + SlotKindType::Workflow => id!("workflow"), + SlotKindType::Activity => id!("activity"), + SlotKindType::LocalActivity => id!("local_activity"), + SlotKindType::Nexus => id!("nexus"), + }, + // Task queue + self.task_queue, + // Worker identity + self.worker_identity, + // Worker deployment name + self.worker_deployment_name, + // Worker build ID + self.worker_build_id, + // Sticky + self.is_sticky, + ), + ) + } +} + +struct SlotMarkUsedContextHolder { + slot_info: SlotInfoHolder, + permit: Option>>, +} + +impl SlotMarkUsedContextHolder { + fn new(ctx: &dyn SlotMarkUsedContext) -> Self { + Self { + slot_info: SlotInfoHolder::new(ctx.info().downcast()), + permit: ctx + .permit() + .user_data::>>() + .cloned(), + } + } +} + +impl TryIntoValue for SlotMarkUsedContextHolder { + fn try_into_value(self, ruby: &Ruby) -> Result { + ruby.get_inner(&MARK_USED_CONTEXT_CLASS).funcall( + id!("new"), + ( + // Slot info + self.slot_info.try_into_value(ruby)?, + // Permit + self.permit + .map(|v| v.value(ruby)) + .unwrap_or_else(|| ruby.qnil().as_value()), + ), + ) + } +} + +struct SlotReleaseContextHolder { + slot_info: Option, + permit: Option>>, +} + +impl SlotReleaseContextHolder { + fn new(ctx: &dyn SlotReleaseContext) -> Self { + Self { + slot_info: ctx.info().map(|i| SlotInfoHolder::new(i.downcast())), + permit: ctx + .permit() + .user_data::>>() + .cloned(), + } + } +} + +impl TryIntoValue for SlotReleaseContextHolder { + fn try_into_value(self, ruby: &Ruby) -> Result { + ruby.get_inner(&RELEASE_CONTEXT_CLASS).funcall( + id!("new"), + ( + // Slot info + if let Some(slot_info) = self.slot_info { + Some(slot_info.try_into_value(ruby)?) + } else { + None + }, + // Permit + self.permit + .map(|v| v.value(ruby)) + .unwrap_or_else(|| ruby.qnil().as_value()), + ), + ) + } +} + +enum SlotInfoHolder { + Workflow(WorkflowSlotInfo), + Activity(ActivitySlotInfo), + LocalActivity(LocalActivitySlotInfo), + Nexus(NexusSlotInfo), +} + +impl SlotInfoHolder { + fn new(slot_info: SlotInfo) -> Self { + match slot_info { + SlotInfo::Workflow(v) => SlotInfoHolder::Workflow(v.clone()), + SlotInfo::Activity(v) => SlotInfoHolder::Activity(v.clone()), + SlotInfo::LocalActivity(v) => SlotInfoHolder::LocalActivity(v.clone()), + SlotInfo::Nexus(v) => SlotInfoHolder::Nexus(v.clone()), + } + } +} + +impl TryIntoValue for SlotInfoHolder { + fn try_into_value(self, ruby: &Ruby) -> Result { + match self { + SlotInfoHolder::Workflow(v) => { + ruby.get_inner(&SLOT_INFO_WORKFLOW_CLASS).funcall( + id!("new"), + ( + // Workflow type + v.workflow_type, + // Sticky + v.is_sticky, + ), + ) + } + SlotInfoHolder::Activity(v) => { + ruby.get_inner(&SLOT_INFO_ACTIVITY_CLASS).funcall( + id!("new"), + ( + // Activity type + v.activity_type, + ), + ) + } + SlotInfoHolder::LocalActivity(v) => { + ruby.get_inner(&SLOT_INFO_LOCAL_ACTIVITY_CLASS).funcall( + id!("new"), + ( + // Activity type + v.activity_type, + ), + ) + } + SlotInfoHolder::Nexus(v) => ruby.get_inner(&SLOT_INFO_NEXUS_CLASS).funcall( + id!("new"), + ( + // Service + v.service, + // Operation + v.operation, + ), + ), + } + } } diff --git a/temporalio/lib/temporalio/internal/bridge/worker.rb b/temporalio/lib/temporalio/internal/bridge/worker.rb index 1677edd0..a890033e 100644 --- a/temporalio/lib/temporalio/internal/bridge/worker.rb +++ b/temporalio/lib/temporalio/internal/bridge/worker.rb @@ -40,6 +40,7 @@ class Worker TunerSlotSupplierOptions = Struct.new( :fixed_size, :resource_based, + :custom, keyword_init: true ) @@ -103,6 +104,55 @@ def complete_activity_task_in_background(proto) # TODO(cretz): Log error on this somehow? async_complete_activity_task(proto.to_proto, queue) end + + class CustomSlotSupplier + def initialize(slot_supplier:, thread_pool:) + @slot_supplier = slot_supplier + @thread_pool = thread_pool + end + + def reserve_slot(context, cancellation, &block) + run_user_code do + @slot_supplier.reserve_slot(context, cancellation) { |v| block.call(v) } + rescue Exception => e # rubocop:disable Lint/RescueException + block.call(e) + end + end + + def try_reserve_slot(context, &block) + run_user_code do + block.call(@slot_supplier.try_reserve_slot(context)) + rescue Exception => e # rubocop:disable Lint/RescueException + block.call(e) + end + end + + def mark_slot_used(context, &block) + run_user_code do + block.call(@slot_supplier.mark_slot_used(context)) + rescue Exception => e # rubocop:disable Lint/RescueException + block.call(e) + end + end + + def release_slot(context, &block) + run_user_code do + block.call(@slot_supplier.release_slot(context)) + rescue Exception => e # rubocop:disable Lint/RescueException + block.call(e) + end + end + + private + + def run_user_code(&) + if @thread_pool + @thread_pool.execute(&) + else + yield + end + end + end end end end diff --git a/temporalio/lib/temporalio/worker/tuner.rb b/temporalio/lib/temporalio/worker/tuner.rb index 60f8916b..42f208aa 100644 --- a/temporalio/lib/temporalio/worker/tuner.rb +++ b/temporalio/lib/temporalio/worker/tuner.rb @@ -22,10 +22,11 @@ def initialize(slots) # rubocop:disable Lint/MissingSuper end # @!visibility private - def _to_bridge_options + def _to_bridge_options(_tuner) Internal::Bridge::Worker::TunerSlotSupplierOptions.new( fixed_size: slots, - resource_based: nil + resource_based: nil, + custom: nil ) end end @@ -36,7 +37,7 @@ def _to_bridge_options class ResourceBased < SlotSupplier attr_reader :tuner_options, :slot_options - # Create a reosurce-based slot supplier. + # Create a resource-based slot supplier. # # @param tuner_options [ResourceBasedTunerOptions] General tuner options. # @param slot_options [ResourceBasedSlotOptions] Slot-supplier-specific tuner options. @@ -46,7 +47,7 @@ def initialize(tuner_options:, slot_options:) # rubocop:disable Lint/MissingSupe end # @!visibility private - def _to_bridge_options + def _to_bridge_options(_tuner) Internal::Bridge::Worker::TunerSlotSupplierOptions.new( fixed_size: nil, resource_based: Internal::Bridge::Worker::TunerResourceBasedSlotSupplierOptions.new( @@ -55,14 +56,175 @@ def _to_bridge_options min_slots: slot_options.min_slots, max_slots: slot_options.max_slots, ramp_throttle: slot_options.ramp_throttle + ), + custom: nil + ) + end + end + + # A slot supplier that has callbacks invoked to handle slot supplying. + # + # Users should be cautious when implementing this and make sure it is heavily tested and the documentation for + # every method is well understood. + # + # @note WARNING: This API is experimental. + class Custom < SlotSupplier + # Context provided for slot reservation on custom slot supplier. + # + # @!attribute slot_type + # @return [:workflow, :activity, :local_activity, :nexus] Slot type. + # @!attribute task_queue + # @return [String] Task queue. + # @!attribute worker_identity + # @return [String] Worker identity. + # @!attribute worker_deployment_name + # @return [String] Worker deployment name or empty string if not applicable. + # @!attribute worker_build_id + # @return [String] Worker build ID or empty string if not applicable. + # @!attribute sticky? + # @return [Boolean] True if this reservation is for a sticky workflow task. + ReserveContext = Data.define( + :slot_type, + :task_queue, + :worker_identity, + :worker_deployment_name, + :worker_build_id, + :sticky? + ) + + # Context provided for marking a slot used. + # + # @!attribute slot_info + # @return [SlotInfo::Workflow, SlotInfo::Activity, SlotInfo::LocalActivity, SlotInfo::Nexus] Information + # about the slot. This is never nil. + # @!attribute permit + # @return [Object] Object that was provided as the permit on reserve. + MarkUsedContext = Data.define( + :slot_info, + :permit + ) + + # Context provided for releasing a slot. + # + # @!attribute slot_info + # @return [SlotInfo::Workflow, SlotInfo::Activity, SlotInfo::LocalActivity, SlotInfo::Nexus, nil] + # Information about the slot. This may be nil if the slot was never used. + # @!attribute permit + # @return [Object] Object that was provided as the permit on reserve. + ReleaseContext = Data.define( + :slot_info, + :permit + ) + + # Reserve a slot. + # + # This can/should block and must provide the permit to the (code) block. The permit is any object (including + # nil) that will be given on the context for mark_slot_used and release_slot. + # + # Just returning from this call is not enough to reserve the slot, a permit must be provided to the block + # (e.g. via yield or block.call). If the call completes, the system will still wait on the block (so this can + # be backgrounded by passing the block to something else). Reservations may be canceled via the given + # cancellation. Users can do things like add_cancel_callback, but it is very important that the code in the + # callback is fast as it is run on the same reactor thread as many other Temporal Ruby worker calls. + # + # @note WARNING: This call should never raise an exception. Any exception raised is ignored and this is called + # again after 1 second. + # + # @param context [ReserveContext] Contextual information about this reserve call. + # @param cancellation [Cancellation] Cancellation that is canceled when the reservation is no longer being + # asked for. + # @yield [Object] Confirm reservation and provide a permit. + def reserve_slot(context, cancellation, &) + raise NotImplementedError + end + + # Try to reserve a slot. + # + # @note WARNING: This should never block, this should return immediately with a permit, or nil if the + # reservation could not occur. + # + # @note WARNING: This call should never raise an exception. Any exception raised is ignored and the slot + # reservation attempt fails (i.e. same as if this method returned nil). + # + # @param context [ReserveContext] Contextual information about this reserve call. + # @return [Object, nil] A non-nil object to perform the reservation successfully, a nil to fail the + # reservation. + def try_reserve_slot(context) + raise NotImplementedError + end + + # Mark a slot as used. + # + # Due to the nature of Temporal polling, slots are reserved before they are used and may never get used. This + # call is made as just a notification when a slot is actually used. + # + # @note WARNING: This should never block, this should return immediately. + # + # @note WARNING: This call should never raise an exception. Any exception raised is ignored. + # + # @param context [MarkUsedContext] Contextual information about this reserve call. + def mark_slot_used(context) + raise NotImplementedError + end + + # Release a previously reserved slot. + # + # @note WARNING: This should never block, this should return immediately. + # + # @note WARNING: This call should never raise an exception. Any exception raised is ignored. + # + # @param context [ReleaseContext] Contextual information about this reserve call. + def release_slot(context) + raise NotImplementedError + end + + # @!visibility private + def _to_bridge_options(tuner) + Internal::Bridge::Worker::TunerSlotSupplierOptions.new( + fixed_size: nil, + resource_based: nil, + custom: Internal::Bridge::Worker::CustomSlotSupplier.new( + slot_supplier: self, + thread_pool: tuner.custom_slot_supplier_thread_pool ) ) end + + # Slot information. + module SlotInfo + # Information about a workflow slot. + # + # @!attribute workflow_type + # @return [String] Workflow type. + # @!attribute sticky? + # @return [Boolean] Whether the slot was for a sticky task. + Workflow = Data.define(:workflow_type, :sticky?) + + # Information about an activity slot. + # + # @!attribute activity_type + # @return [String] Activity type. + Activity = Data.define(:activity_type) + + # Information about a local activity slot. + # + # @!attribute activity_type + # @return [String] Activity type. + LocalActivity = Data.define(:activity_type) + + # Information about a Nexus slot. + # + # @!attribute service + # @return [String] Nexus service. + # @!attribute operation + # @return [String] Nexus operation. + Nexus = Data.define(:service, :operation) + end end # @!visibility private - def _to_bridge_options - raise ArgumentError, 'Tuner slot suppliers must be instances of Fixed or ResourceBased' + def _to_bridge_options(_tuner) + raise ArgumentError, 'Tuner slot suppliers must be instances of Fixed, ResourceBased, or Custom' end end @@ -75,10 +237,9 @@ def _to_bridge_options # @!attribute target_cpu_usage # @return [Float] A value between 0 and 1 that represents the target (system) CPU usage. This can be set to 1.0 # if desired, but it's recommended to leave some headroom for other processes. - ResourceBasedTunerOptions = Struct.new( + ResourceBasedTunerOptions = Data.define( :target_memory_usage, - :target_cpu_usage, - keyword_init: true + :target_cpu_usage ) # Options for a specific slot type being used with {SlotSupplier::ResourceBased}. @@ -94,11 +255,10 @@ def _to_bridge_options # # This value matters because how many resources a task will use cannot be determined ahead of time, and thus # the system should wait to see how much resources are used before issuing more slots. - ResourceBasedSlotOptions = Struct.new( + ResourceBasedSlotOptions = Data.define( :min_slots, :max_slots, - :ramp_throttle, - keyword_init: true + :ramp_throttle ) # Create a fixed-size tuner with the provided number of slots. @@ -161,27 +321,36 @@ def self.create_resource_based( # @return [SlotSupplier] Slot supplier for local activities. attr_reader :local_activity_slot_supplier + # @return [ThreadPool, nil] Thread pool for custom slot suppliers. + attr_reader :custom_slot_supplier_thread_pool + # Create a tuner from 3 slot suppliers. # # @param workflow_slot_supplier [SlotSupplier] Slot supplier for workflows. # @param activity_slot_supplier [SlotSupplier] Slot supplier for activities. # @param local_activity_slot_supplier [SlotSupplier] Slot supplier for local activities. + # @param custom_slot_supplier_thread_pool [ThreadPool, nil] Thread pool to make all custom slot supplier calls on. + # If there are no custom slot suppliers, this parameter is ignored. Technically users may set this to nil which + # will not use a thread pool to make slot supplier calls, but that is dangerous and not advised because even the + # slightest blocking call can slow down the system. def initialize( workflow_slot_supplier:, activity_slot_supplier:, - local_activity_slot_supplier: + local_activity_slot_supplier:, + custom_slot_supplier_thread_pool: ThreadPool.default ) @workflow_slot_supplier = workflow_slot_supplier @activity_slot_supplier = activity_slot_supplier @local_activity_slot_supplier = local_activity_slot_supplier + @custom_slot_supplier_thread_pool = custom_slot_supplier_thread_pool end # @!visibility private def _to_bridge_options Internal::Bridge::Worker::TunerOptions.new( - workflow_slot_supplier: workflow_slot_supplier._to_bridge_options, - activity_slot_supplier: activity_slot_supplier._to_bridge_options, - local_activity_slot_supplier: local_activity_slot_supplier._to_bridge_options + workflow_slot_supplier: workflow_slot_supplier._to_bridge_options(self), + activity_slot_supplier: activity_slot_supplier._to_bridge_options(self), + local_activity_slot_supplier: local_activity_slot_supplier._to_bridge_options(self) ) end end diff --git a/temporalio/sig/temporalio/internal/bridge/worker.rbs b/temporalio/sig/temporalio/internal/bridge/worker.rbs index 747b0f00..3381e642 100644 --- a/temporalio/sig/temporalio/internal/bridge/worker.rbs +++ b/temporalio/sig/temporalio/internal/bridge/worker.rbs @@ -63,10 +63,12 @@ module Temporalio class TunerSlotSupplierOptions attr_accessor fixed_size: Integer? attr_accessor resource_based: TunerResourceBasedSlotSupplierOptions? + attr_accessor custom: CustomSlotSupplier? def initialize: ( fixed_size: Integer?, - resource_based: TunerResourceBasedSlotSupplierOptions? + resource_based: TunerResourceBasedSlotSupplierOptions?, + custom: CustomSlotSupplier? ) -> void end @@ -94,6 +96,32 @@ module Temporalio def complete_activity_task_in_background: (untyped proto) -> void + class CustomSlotSupplier + def initialize: ( + slot_supplier: Temporalio::Worker::Tuner::SlotSupplier::Custom, + thread_pool: Temporalio::Worker::ThreadPool? + ) -> void + + def reserve_slot: ( + Temporalio::Worker::Tuner::SlotSupplier::Custom::ReserveContext context, + Temporalio::Cancellation cancellation + ) { (untyped) -> void } -> void + + def try_reserve_slot: ( + Temporalio::Worker::Tuner::SlotSupplier::Custom::ReserveContext context + ) { (untyped) -> void } -> void + + def mark_slot_used: ( + Temporalio::Worker::Tuner::SlotSupplier::Custom::MarkUsedContext context + ) { (untyped) -> void } -> void + + def release_slot: ( + Temporalio::Worker::Tuner::SlotSupplier::Custom::ReleaseContext context + ) { (untyped) -> void } -> void + + def run_user_code: [T] { -> T } -> T + end + # Defined in Rust def self.new: (Client client, Options options) -> Worker diff --git a/temporalio/sig/temporalio/worker/tuner.rbs b/temporalio/sig/temporalio/worker/tuner.rbs index 23c08a63..f46b9073 100644 --- a/temporalio/sig/temporalio/worker/tuner.rbs +++ b/temporalio/sig/temporalio/worker/tuner.rbs @@ -18,12 +18,58 @@ module Temporalio ) -> void end - def _to_bridge_options: -> Internal::Bridge::Worker::TunerSlotSupplierOptions + class Custom < SlotSupplier + def reserve_slot: (ReserveContext context, Cancellation cancellation) { (untyped) -> void } -> void + + def try_reserve_slot: (ReserveContext context) -> untyped + + def mark_slot_used: (MarkUsedContext context) -> void + + def release_slot: (ReleaseContext context) -> void + + class ReserveContext + attr_reader slot_type: :workflow | :activity | :local_activity | :nexus + attr_reader task_queue: String + attr_reader worker_identity: String + attr_reader worker_deployment_name: String + attr_reader worker_build_id: String + attr_reader sticky?: bool + end + + class MarkUsedContext + attr_reader slot_info: SlotInfo::Workflow | SlotInfo::Activity | SlotInfo::LocalActivity | SlotInfo::Nexus + attr_reader permit: untyped + end + + class ReleaseContext + attr_reader slot_info: SlotInfo::Workflow | SlotInfo::Activity | SlotInfo::LocalActivity | SlotInfo::Nexus | nil + attr_reader permit: untyped + end + + module SlotInfo + class Workflow + attr_reader workflow_type: String + attr_reader sticky?: bool + end + class Activity + attr_reader activity_type: String + end + class LocalActivity + attr_reader activity_type: String + end + class Nexus + attr_reader service: String + attr_reader operation: String + end + end + end + + def _to_bridge_options: (Tuner tuner) -> Internal::Bridge::Worker::TunerSlotSupplierOptions end class ResourceBasedTunerOptions - attr_accessor target_memory_usage: Float - attr_accessor target_cpu_usage: Float + attr_reader target_memory_usage: Float + attr_reader target_cpu_usage: Float def initialize: ( target_memory_usage: Float, @@ -32,9 +78,9 @@ module Temporalio end class ResourceBasedSlotOptions - attr_accessor min_slots: Integer? - attr_accessor max_slots: Integer? - attr_accessor ramp_throttle: Float? + attr_reader min_slots: Integer? + attr_reader max_slots: Integer? + attr_reader ramp_throttle: Float? def initialize: ( min_slots: Integer?, @@ -60,11 +106,13 @@ module Temporalio attr_reader workflow_slot_supplier: SlotSupplier attr_reader activity_slot_supplier: SlotSupplier attr_reader local_activity_slot_supplier: SlotSupplier + attr_reader custom_slot_supplier_thread_pool: ThreadPool? def initialize: ( workflow_slot_supplier: SlotSupplier, activity_slot_supplier: SlotSupplier, - local_activity_slot_supplier: SlotSupplier + local_activity_slot_supplier: SlotSupplier, + ?custom_slot_supplier_thread_pool: ThreadPool? ) -> void def _to_bridge_options: -> Internal::Bridge::Worker::TunerOptions diff --git a/temporalio/test/sig/worker_test.rbs b/temporalio/test/sig/worker_test.rbs new file mode 100644 index 00000000..bf86e00b --- /dev/null +++ b/temporalio/test/sig/worker_test.rbs @@ -0,0 +1,7 @@ +class WorkerTest < Test + class TrackingSlotSupplier < Temporalio::Worker::Tuner::SlotSupplier::Custom + attr_reader events: Array[[Symbol, untyped]] + + def add_event: (Symbol method, untyped context) -> void + end +end \ No newline at end of file diff --git a/temporalio/test/sig/workflow_utils.rbs b/temporalio/test/sig/workflow_utils.rbs index 3a8d896d..817d09e3 100644 --- a/temporalio/test/sig/workflow_utils.rbs +++ b/temporalio/test/sig/workflow_utils.rbs @@ -19,8 +19,9 @@ module WorkflowUtils ?task_timeout: duration?, ?on_worker_run: Proc?, ?unsafe_workflow_io_enabled: bool, + ?priority: Temporalio::Priority, ?start_workflow_client: Temporalio::Client, - ?priority: Temporalio::Priority + ?tuner: Temporalio::Worker::Tuner ) -> Object? | [T] ( singleton(Temporalio::Workflow::Definition) workflow, @@ -42,8 +43,9 @@ module WorkflowUtils ?task_timeout: duration?, ?on_worker_run: Proc?, ?unsafe_workflow_io_enabled: bool, + ?priority: Temporalio::Priority, ?start_workflow_client: Temporalio::Client, - ?priority: Temporalio::Priority + ?tuner: Temporalio::Worker::Tuner ) { (Temporalio::Client::WorkflowHandle, Temporalio::Worker) -> T } -> T def assert_eventually_task_fail: ( diff --git a/temporalio/test/worker_test.rb b/temporalio/test/worker_test.rb index 1207ae36..a8deeca3 100644 --- a/temporalio/test/worker_test.rb +++ b/temporalio/test/worker_test.rb @@ -239,4 +239,224 @@ def test_can_run_with_autoscaling_poller_behavior end end end + + class BasicPermit < Object; end + + class TrackingSlotSupplier < Temporalio::Worker::Tuner::SlotSupplier::Custom + attr_reader :events + + def reserve_slot(context, _cancellation, &) + add_event(:reserve_slot, context) + yield BasicPermit.new + end + + def try_reserve_slot(context) + add_event(:try_reserve_slot, context) + BasicPermit.new + end + + def mark_slot_used(context) + add_event(:mark_slot_used, context) + end + + def release_slot(context) + add_event(:release_slot, context) + end + + private + + def add_event(method, context) + (@events ||= []) << [method, context] + end + end + + class SlotSupplierActivity < Temporalio::Activity::Definition + def execute; end + end + + class SlotSupplierWorkflow < Temporalio::Workflow::Definition + def execute + Temporalio::Workflow.execute_activity(SlotSupplierActivity, start_to_close_timeout: 10) + Temporalio::Workflow.execute_local_activity(SlotSupplierActivity, start_to_close_timeout: 10) + end + end + + def test_custom_slot_supplier_simple + supplier = TrackingSlotSupplier.new + execute_workflow( + SlotSupplierWorkflow, + activities: [SlotSupplierActivity], + tuner: Temporalio::Worker::Tuner.new( + workflow_slot_supplier: supplier, + activity_slot_supplier: supplier, + local_activity_slot_supplier: supplier + ) + ) + # Assert the events are as expected... + events = supplier.events + + # Had to get reserved for workflow (sticky and non), activity, and local activity + assert(events.any? { |(m, e)| m == :reserve_slot && e.slot_type == :workflow && !e.sticky? }) + assert(events.any? { |(m, e)| m == :reserve_slot && e.slot_type == :workflow && e.sticky? }) + assert(events.any? { |(m, e)| m == :reserve_slot && e.slot_type == :activity }) + assert(events.any? { |(m, e)| m == :reserve_slot && e.slot_type == :local_activity }) + + # Since the activity was eager, had to get a try reserve for it + assert(events.any? { |(m, e)| m == :try_reserve_slot && e.slot_type == :activity }) + + # # Had to get mark used for workflow (sticky and non), activity, and local activity + assert(events.any? do |(m, e)| + m == :mark_slot_used && e.slot_info.is_a?(Temporalio::Worker::Tuner::SlotSupplier::Custom::SlotInfo::Workflow) && + e.slot_info.workflow_type == 'SlotSupplierWorkflow' && !e.slot_info.sticky? && e.permit.is_a?(BasicPermit) + end) + assert(events.any? do |(m, e)| + m == :mark_slot_used && e.slot_info.is_a?(Temporalio::Worker::Tuner::SlotSupplier::Custom::SlotInfo::Workflow) && + e.slot_info.workflow_type == 'SlotSupplierWorkflow' && e.slot_info.sticky? && e.permit.is_a?(BasicPermit) + end) + assert(events.any? do |(m, e)| + m == :mark_slot_used && e.slot_info.is_a?(Temporalio::Worker::Tuner::SlotSupplier::Custom::SlotInfo::Activity) && + e.slot_info.activity_type == 'SlotSupplierActivity' && e.permit.is_a?(BasicPermit) + end) + assert(events.any? do |(m, e)| + m == :mark_slot_used && + e.slot_info.is_a?(Temporalio::Worker::Tuner::SlotSupplier::Custom::SlotInfo::LocalActivity) && + e.slot_info.activity_type == 'SlotSupplierActivity' && e.permit.is_a?(BasicPermit) + end) + + # Must be a release for every reserve + assert_equal( + events.count { |(m, _)| m == :reserve_slot || m == :try_reserve_slot }, + events.count { |(m, _)| m == :release_slot } + ) + end + + class BlockingSlotSupplier < Temporalio::Worker::Tuner::SlotSupplier::Custom + attr_reader :canceled_contexts + + def reserve_slot(context, cancellation, &) + # We'll block on every reserve, waiting for queue to get resolved + queue = Queue.new + (@waiting ||= []) << [context, queue] + cancellation.add_cancel_callback do + (@canceled_contexts ||= []) << context + queue.push(StandardError.new('Canceled')) + end + yield queue.pop + end + + def try_reserve_slot(_context) + # No try-reserve + None + end + + def mark_slot_used(_context) + # Do nothing + end + + def release_slot(_context) + # Do nothing + end + + def resolve_a_reserve(slot_type) + waiting_idx = @waiting&.index { |(context, _)| context.slot_type == slot_type } + raise 'Not found' unless waiting_idx + + _, queue = @waiting.delete_at(waiting_idx) + queue << BasicPermit.new + end + + def waiting_contexts + @waiting.map(&:first) + end + end + + class BlockingSlotSupplierWorkflow < Temporalio::Workflow::Definition + def execute + Temporalio::Workflow.execute_activity(SlotSupplierActivity, start_to_close_timeout: 10) + end + end + + def test_custom_slot_supplier_blocking + supplier = BlockingSlotSupplier.new + waiting_contexts = execute_workflow( + BlockingSlotSupplierWorkflow, + activities: [SlotSupplierActivity], + tuner: Temporalio::Worker::Tuner.new( + workflow_slot_supplier: supplier, + activity_slot_supplier: supplier, + local_activity_slot_supplier: supplier + ), + # Core does not progress if you don't let sticky be reserved, and we want to manually control every slot + max_cached_workflows: 0 + ) do |handle| + # Make sure history has not started a task + assert handle.fetch_history_events.none?(&:workflow_task_started_event_attributes) + # Some slower test runners don't start asking for reserve until after we've reached here, so we will wait until + # we see reservation requests for workflow and activity + assert_eventually do + assert(supplier.waiting_contexts.any? { |ctx| ctx.slot_type == :workflow }) + assert(supplier.waiting_contexts.any? { |ctx| ctx.slot_type == :activity }) + end + # Let one reserve call for workflow task through + supplier.resolve_a_reserve(:workflow) + # Make sure history has a task completed + assert_eventually { assert handle.fetch_history_events.any?(&:workflow_task_completed_event_attributes) } + + # Now resolve one activity and wait for history to show activity completion + supplier.resolve_a_reserve(:activity) + assert_eventually { assert handle.fetch_history_events.any?(&:activity_task_completed_event_attributes) } + + # Now resolve one workflow task to let that progress + supplier.resolve_a_reserve(:workflow) + + # Confirm the workflow completes successfully + handle.result + + # Collect the waiting contexts + supplier.waiting_contexts + end + + # Confirm we canceled all waiting reservations + assert_equal waiting_contexts.size, supplier.canceled_contexts.size + waiting_contexts.each { |w| assert_includes supplier.canceled_contexts, w } + end + + class RaisingSlotSupplier < Temporalio::Worker::Tuner::SlotSupplier::Custom + def reserve_slot(context, _cancellation, &) + # We'll only raise on non-workflows + raise 'Intentional error' unless context.slot_type == :workflow + + yield BasicPermit.new + end + + def try_reserve_slot(_context) + raise 'Intentional error' + end + + def mark_slot_used(_context) + raise 'Intentional error' + end + + def release_slot(_context) + raise 'Intentional error' + end + end + + class SlotSupplierRaisingWorkflow < Temporalio::Workflow::Definition + def execute + 'done' + end + end + + def test_custom_slot_supplier_raising + supplier = RaisingSlotSupplier.new + assert_equal 'done', execute_workflow( + SlotSupplierRaisingWorkflow, + tuner: Temporalio::Worker::Tuner.new( + workflow_slot_supplier: supplier, + activity_slot_supplier: supplier, + local_activity_slot_supplier: supplier + ) + ) + end end diff --git a/temporalio/test/workflow_utils.rb b/temporalio/test/workflow_utils.rb index f642685a..539c013d 100644 --- a/temporalio/test/workflow_utils.rb +++ b/temporalio/test/workflow_utils.rb @@ -32,7 +32,8 @@ def execute_workflow( on_worker_run: nil, unsafe_workflow_io_enabled: false, priority: Temporalio::Priority.default, - start_workflow_client: client + start_workflow_client: client, + tuner: Temporalio::Worker::Tuner.create_fixed ) worker = Temporalio::Worker.new( client:, @@ -45,7 +46,8 @@ def execute_workflow( workflow_payload_codec_thread_pool:, max_heartbeat_throttle_interval:, interceptors:, - unsafe_workflow_io_enabled: + unsafe_workflow_io_enabled:, + tuner: ) worker.run do on_worker_run&.call