diff --git a/ci/Cargo.lock.min b/ci/Cargo.lock.min index 134d85ab..be09b13e 100644 --- a/ci/Cargo.lock.min +++ b/ci/Cargo.lock.min @@ -52,13 +52,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.83" +version = "0.1.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" +checksum = "1b1244b10dcd56c92219da4e14caa97e312079e185f04ba3eea25061561dc0a0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -81,16 +81,15 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8478a5c29ead3f3be14aff8a202ad965cf7da6856860041bfca271becf8ba48b" +checksum = "923ded50f602b3007e5e63e3f094c479d9c8a9b42d7f4034e4afe456aa48bfd2" dependencies = [ "bindgen", "cc", "cmake", "dunce", "fs_extra", - "libc", "paste", ] @@ -154,7 +153,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn", + "syn 2.0.95", "which", ] @@ -228,9 +227,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.20.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" [[package]] name = "byteorder" @@ -249,9 +248,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.4" +version = "1.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" +checksum = "a012a0df96dd6d06ba9a1b29d6402d1a5d77c6befd2566afdc26e10603dc93d7" dependencies = [ "jobserver", "libc", @@ -343,6 +342,12 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "darling" version = "0.20.10" @@ -364,7 +369,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.95", ] [[package]] @@ -375,7 +380,21 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn", + "syn 2.0.95", +] + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", ] [[package]] @@ -403,7 +422,7 @@ checksum = "bc2323e10c92e1cf4d86e11538512e6dc03ceb586842970b6332af3d4046a046" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -424,7 +443,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -579,7 +598,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -631,9 +650,9 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "glob" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" [[package]] name = "hashbrown" @@ -641,6 +660,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -763,9 +788,9 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.27.4" +version = "0.27.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6884a48c6826ec44f524c7456b163cebe9e55a18d7b5e307cb4f100371cc767" +checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" dependencies = [ "futures-util", "http", @@ -950,7 +975,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -1105,9 +1130,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.168" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libloading" @@ -1224,7 +1249,7 @@ checksum = "254a5372af8fc138e36684761d3c0cdb758a4410e938babcff1c860ce14ddbfc" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -1245,6 +1270,7 @@ dependencies = [ "bytes", "chrono", "chrono-tz", + "dashmap", "deadpool", "delegate", "futures", @@ -1280,7 +1306,7 @@ name = "neo4rs-macros" version = "0.3.0" dependencies = [ "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -1348,9 +1374,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.5" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" dependencies = [ "memchr", ] @@ -1412,7 +1438,7 @@ dependencies = [ "regex", "regex-syntax", "structmeta", - "syn", + "syn 2.0.95", ] [[package]] @@ -1518,7 +1544,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.95", ] [[package]] @@ -1532,9 +1558,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" dependencies = [ "proc-macro2", ] @@ -1726,9 +1752,9 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "safe_arch" -version = "0.7.2" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3460605018fdc9612bce72735cba0d27efbcd9904780d44c7e3a9948f96148a" +checksum = "96b02de82ddbe1b636e6170c21be622223aea188ef2e139be0a5b219ec215323" dependencies = [ "bytemuck", ] @@ -1773,9 +1799,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] @@ -1791,20 +1817,20 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] name = "serde_json" -version = "1.0.133" +version = "1.0.134" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" +checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" dependencies = [ "itoa", "memchr", @@ -1814,13 +1840,13 @@ dependencies = [ [[package]] name = "serde_repr" -version = "0.1.19" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" +checksum = "cd02c7587ec314570041b2754829f84d873ced14a96d1fd1823531e11db40573" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1846,9 +1872,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.11.0" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" +checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" dependencies = [ "base64 0.22.1", "chrono", @@ -1864,14 +1890,14 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.11.0" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" +checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -1960,7 +1986,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn", + "syn 2.0.95", ] [[package]] @@ -1971,7 +1997,7 @@ checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -1982,9 +2008,20 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.90" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "syn" +version = "2.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" +checksum = "46f71c0377baf4ef1cc3e3402ded576dccc315800fbc62dfc7fe04b009773b4a" dependencies = [ "proc-macro2", "quote", @@ -1999,7 +2036,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -2035,7 +2072,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -2046,7 +2083,7 @@ checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", "test-case-core", ] @@ -2105,7 +2142,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -2175,7 +2212,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -2351,7 +2388,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn", + "syn 2.0.95", "wasm-bindgen-shared", ] @@ -2373,7 +2410,7 @@ checksum = "30d7a95b763d3c45903ed6c81f156801839e5ee968bb07e534c44df0fcd330c2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -2398,9 +2435,9 @@ dependencies = [ [[package]] name = "wide" -version = "0.7.30" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58e6db2670d2be78525979e9a5f9c69d296fd7d670549fe9ebf70f8708cb5019" +checksum = "cc0ca27312d1e9218687a4e804cd4b7410bf54125d54d13e5170efbf09066d24" dependencies = [ "bytemuck", "safe_arch", @@ -2659,7 +2696,7 @@ checksum = "2380878cad4ac9aac1e2435f3eb4020e8374b5f13c296cb75b4620ff8e229154" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", "synstructure", ] @@ -2681,7 +2718,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] [[package]] @@ -2701,7 +2738,7 @@ checksum = "595eed982f7d355beb85837f651fa22e90b3c044842dc7f2c2842c086f295808" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", "synstructure", ] @@ -2730,5 +2767,5 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.95", ] diff --git a/ci/Cargo.lock.msrv b/ci/Cargo.lock.msrv index 134d85ab..e2d28b1f 100644 --- a/ci/Cargo.lock.msrv +++ b/ci/Cargo.lock.msrv @@ -52,9 +52,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.83" +version = "0.1.84" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" +checksum = "1b1244b10dcd56c92219da4e14caa97e312079e185f04ba3eea25061561dc0a0" dependencies = [ "proc-macro2", "quote", @@ -81,16 +81,15 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.24.0" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8478a5c29ead3f3be14aff8a202ad965cf7da6856860041bfca271becf8ba48b" +checksum = "923ded50f602b3007e5e63e3f094c479d9c8a9b42d7f4034e4afe456aa48bfd2" dependencies = [ "bindgen", "cc", "cmake", "dunce", "fs_extra", - "libc", "paste", ] @@ -228,9 +227,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.20.0" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" +checksum = "ef657dfab802224e671f5818e9a4935f9b1957ed18e58292690cc39e7a4092a3" [[package]] name = "byteorder" @@ -249,9 +248,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.2.4" +version = "1.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9157bbaa6b165880c27a4293a474c91cdcf265cc68cc829bf10be0964a391caf" +checksum = "a012a0df96dd6d06ba9a1b29d6402d1a5d77c6befd2566afdc26e10603dc93d7" dependencies = [ "jobserver", "libc", @@ -343,6 +342,12 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "darling" version = "0.20.10" @@ -378,6 +383,20 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deadpool" version = "0.12.1" @@ -631,9 +650,9 @@ checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" [[package]] name = "glob" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" [[package]] name = "hashbrown" @@ -641,6 +660,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -763,9 +788,9 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.27.4" +version = "0.27.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6884a48c6826ec44f524c7456b163cebe9e55a18d7b5e307cb4f100371cc767" +checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" dependencies = [ "futures-util", "http", @@ -1105,9 +1130,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.168" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libloading" @@ -1245,6 +1270,7 @@ dependencies = [ "bytes", "chrono", "chrono-tz", + "dashmap", "deadpool", "delegate", "futures", @@ -1348,9 +1374,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.5" +version = "0.36.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" dependencies = [ "memchr", ] @@ -1532,9 +1558,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.37" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +checksum = "0e4dccaaaf89514f546c693ddc140f729f958c247918a13380cccc6078391acc" dependencies = [ "proc-macro2", ] @@ -1726,9 +1752,9 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "safe_arch" -version = "0.7.2" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3460605018fdc9612bce72735cba0d27efbcd9904780d44c7e3a9948f96148a" +checksum = "96b02de82ddbe1b636e6170c21be622223aea188ef2e139be0a5b219ec215323" dependencies = [ "bytemuck", ] @@ -1773,9 +1799,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] @@ -1791,9 +1817,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.216" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", @@ -1802,9 +1828,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.133" +version = "1.0.134" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fceb2473b9166b2294ef05efcb65a3db80803f0b03ef86a5fc88a2b85ee377" +checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" dependencies = [ "itoa", "memchr", @@ -1846,9 +1872,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.11.0" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e28bdad6db2b8340e449f7108f020b3b092e8583a9e3fb82713e1d4e71fe817" +checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa" dependencies = [ "base64 0.22.1", "chrono", @@ -1864,9 +1890,9 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.11.0" +version = "3.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d846214a9854ef724f3da161b426242d8de7c1fc7de2f89bb1efcb154dca79d" +checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e" dependencies = [ "darling", "proc-macro2", @@ -1982,9 +2008,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.90" +version = "2.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" +checksum = "46f71c0377baf4ef1cc3e3402ded576dccc315800fbc62dfc7fe04b009773b4a" dependencies = [ "proc-macro2", "quote", @@ -2398,9 +2424,9 @@ dependencies = [ [[package]] name = "wide" -version = "0.7.30" +version = "0.7.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "58e6db2670d2be78525979e9a5f9c69d296fd7d670549fe9ebf70f8708cb5019" +checksum = "cc0ca27312d1e9218687a4e804cd4b7410bf54125d54d13e5170efbf09066d24" dependencies = [ "bytemuck", "safe_arch", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 1d1a1e9a..c6449da5 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -31,6 +31,7 @@ unstable-bolt-protocol-impl-v2 = [ backoff = { version = "0.4.0", features = ["tokio"] } bytes = { version = "1.5.0", features = ["serde"] } chrono-tz = "0.10.0" +dashmap = "6.1.0" delegate = "0.13.0" futures = { version = "0.3.0" } log = "0.4.0" diff --git a/lib/src/bolt/request/mod.rs b/lib/src/bolt/request/mod.rs index b97baea0..258a609c 100644 --- a/lib/src/bolt/request/mod.rs +++ b/lib/src/bolt/request/mod.rs @@ -6,6 +6,7 @@ mod hello; mod pull; mod reset; mod rollback; +mod route; pub use commit::Commit; pub use discard::Discard; diff --git a/lib/src/bolt/request/route.rs b/lib/src/bolt/request/route.rs new file mode 100644 index 00000000..81079e1b --- /dev/null +++ b/lib/src/bolt/request/route.rs @@ -0,0 +1,202 @@ +use crate::bolt::{ExpectedResponse, Summary}; +use crate::connection::{NeoUrl, Routing}; +use crate::routing::{Extra, Route, RouteExtra, RoutingTable}; +use serde::ser::{SerializeMap, SerializeStructVariant}; +use serde::{Deserialize, Serialize}; +use std::fmt::{format, Display, Formatter}; + +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +pub struct Response { + pub rt: RoutingTable, +} + +impl<'a> ExpectedResponse for Route<'a> { + type Response = Summary; +} + +impl Serialize for Routing { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + Routing::No => serializer.serialize_none(), + Routing::Yes(routing) => { + let mut map = serializer.serialize_map(Some(routing.len()))?; + for (k, v) in routing { + map.serialize_entry(k.value.as_str(), v.value.as_str())?; + } + map.end() + } + } + } +} + +impl Serialize for Route<'_> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut structure = serializer.serialize_struct_variant("Request", 0x66, "ROUTE", 3)?; + structure.serialize_field("routing", &self.routing)?; + structure.serialize_field("bookmarks", &self.bookmarks)?; + match self.extra { + RouteExtra::V4_3(ref db) => { + structure.serialize_field("db", db)?; + } + RouteExtra::V4_4(ref extra) => { + structure.serialize_field("extra", extra)?; + } + } + structure.end() + } +} + +impl Serialize for Extra { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(2))?; + map.serialize_entry("db", &self.db)?; + map.serialize_entry("imp_user", &self.imp_user)?; + map.end() + } +} + +#[cfg(test)] +mod tests { + use crate::bolt::request::route::Response; + use crate::bolt::{Message, MessageResponse}; + use crate::connection::Routing; + use crate::packstream::bolt; + use crate::routing::{Route, RouteBuilder}; + use crate::{Database, Version}; + + #[test] + fn serialize() { + let route = RouteBuilder::new( + Routing::Yes(vec![("address".into(), "localhost:7687".into())]), + vec!["bookmark"], + ) + .with_db(Database::from("neo4j")) + .build(Version::V4_3); + let bytes = route.to_bytes().unwrap(); + + let expected = bolt() + .structure(3, 0x66) + .tiny_map(1) + .tiny_string("address") + .tiny_string("localhost:7687") + .tiny_list(1) + .tiny_string("bookmark") + .tiny_string("neo4j") + .build(); + + assert_eq!(bytes, expected); + } + + #[test] + fn serialize_no_db() { + let builder = RouteBuilder::new( + Routing::Yes(vec![("address".into(), "localhost:7687".into())]), + vec!["bookmark"], + ); + let route = builder.build(Version::V4_3); + let serialized = route.to_bytes().unwrap(); + + let expected = bolt() + .structure(3, 0x66) + .tiny_map(1) + .tiny_string("address") + .tiny_string("localhost:7687") + .tiny_list(1) + .tiny_string("bookmark") + .null() + .build(); + + assert_eq!(serialized, expected); + } + + #[test] + fn serialize_no_db_v4_4() { + let builder = RouteBuilder::new( + Routing::Yes(vec![("address".into(), "localhost:7687".into())]), + vec!["bookmark"], + ); + let route = builder.build(Version::V4_4); + let serialized = route.to_bytes().unwrap(); + + let expected = bolt() + .structure(3, 0x66) + .tiny_map(1) + .tiny_string("address") + .tiny_string("localhost:7687") + .tiny_list(1) + .tiny_string("bookmark") + .tiny_map(2) + .tiny_string("db") + .null() + .tiny_string("imp_user") + .null() + .build(); + + assert_eq!(serialized, expected); + } + + #[test] + fn serialize_with_db_v4_4() { + let builder = RouteBuilder::new( + Routing::Yes(vec![("address".into(), "localhost:7687".into())]), + vec!["bookmark"], + ); + let route = builder + .with_db("neo4j".into()) + .with_imp_user("Foo") + .build(Version::V4_4); + let serialized = route.to_bytes().unwrap(); + + let expected = bolt() + .structure(3, 0x66) + .tiny_map(1) + .tiny_string("address") + .tiny_string("localhost:7687") + .tiny_list(1) + .tiny_string("bookmark") + .tiny_map(2) + .tiny_string("db") + .tiny_string("neo4j") + .tiny_string("imp_user") + .tiny_string("Foo") + .build(); + + assert_eq!(serialized, expected); + } + + #[test] + fn parse() { + let data = bolt() + .tiny_map(1) + .tiny_string("rt") + .tiny_map(3) + .tiny_string("ttl") + .int64(1000) + .tiny_string("db") + .tiny_string("neo4j") + .tiny_string("servers") + .tiny_list(1) + .tiny_map(2) + .tiny_string("addresses") + .tiny_list(1) + .tiny_string("localhost:7687") + .tiny_string("role") + .tiny_string("ROUTE") + .build(); + + let response = Response::parse(data).unwrap(); + + assert_eq!(response.rt.ttl, 1000); + assert_eq!(response.rt.db.unwrap().as_ref(), "neo4j"); + assert_eq!(response.rt.servers.len(), 1); + } +} diff --git a/lib/src/config.rs b/lib/src/config.rs index 93f442f1..db7e8fc3 100644 --- a/lib/src/config.rs +++ b/lib/src/config.rs @@ -1,5 +1,7 @@ use crate::auth::{ClientCertificate, ConnectionTLSConfig}; use crate::errors::{Error, Result}; +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +use serde::{Deserialize, Deserializer, Serialize}; use std::path::Path; use std::{ops::Deref, sync::Arc}; @@ -11,6 +13,27 @@ const DEFAULT_MAX_CONNECTIONS: usize = 16; #[derive(Clone, Debug, PartialEq, Eq)] pub struct Database(Arc); +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +impl Serialize for Database { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + (*self.0).serialize(serializer) + } +} + +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +impl<'de> Deserialize<'de> for Database { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Ok(Database::from(s)) + } +} + impl From<&str> for Database { fn from(s: &str) -> Self { Database(s.into()) diff --git a/lib/src/connection.rs b/lib/src/connection.rs index a7686722..3cb851bd 100644 --- a/lib/src/connection.rs +++ b/lib/src/connection.rs @@ -1,10 +1,16 @@ use crate::auth::ConnectionTLSConfig; -#[cfg(feature = "unstable-bolt-protocol-impl-v2")] -use crate::bolt::{ - ExpectedResponse, Hello, HelloBuilder, Message, MessageResponse, Reset, Summary, -}; #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] use crate::messages::HelloBuilder; +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +use { + crate::bolt::{ + ExpectedResponse, Hello, HelloBuilder, Message, MessageResponse, Reset, Summary, + }, + log::debug, +}; + +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +use crate::routing::{Route, RoutingTable}; use crate::{ connection::stream::ConnectionStream, errors::{Error, Result}, @@ -13,12 +19,12 @@ use crate::{ BoltMap, BoltString, BoltType, }; use bytes::{BufMut, Bytes, BytesMut}; -use log::warn; +use log::{info, warn}; use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier}; use rustls::crypto::CryptoProvider; use rustls::pki_types::{CertificateDer, UnixTime}; use rustls::{DigitallySignedStruct, SignatureScheme}; -use std::fmt::{Debug, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use std::{fs::File, io::BufReader, mem, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufStream}, @@ -49,6 +55,10 @@ impl Connection { Ok(connection) } + pub fn version(&self) -> Version { + self.version + } + pub(crate) async fn prepare(info: &ConnectionInfo) -> Result { let mut stream = match &info.host { Host::Domain(domain) => TcpStream::connect((&**domain, info.port)).await?, @@ -76,6 +86,7 @@ impl Connection { let mut response = [0, 0, 0, 0]; stream.read_exact(&mut response).await?; let version = Version::parse(response)?; + info!("Connected to Neo4j with version {}", version); Ok(version) } @@ -110,11 +121,23 @@ impl Connection { match hello { Summary::Success(_msg) => Ok(()), - Summary::Ignored => todo!(), + Summary::Ignored => Err(Error::RequestIgnoredError), Summary::Failure(msg) => Err(Error::AuthenticationError(msg.message)), } } + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + pub async fn route(&mut self, route: Route<'_>) -> Result { + debug!("Routing request: {}", route); + let route = self.send_recv_as(route).await?; + + match route { + Summary::Success(msg) => Ok(msg.metadata.rt), + Summary::Ignored => Err(Error::RequestIgnoredError), + Summary::Failure(msg) => Err(Error::RoutingTableError((msg.code, msg.message))), + } + } + pub async fn reset(&mut self) -> Result<()> { #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] { @@ -234,29 +257,7 @@ impl Connection { } } -pub(crate) struct ConnectionInfo { - user: Arc, - password: Arc, - host: Host>, - port: u16, - routing: Routing, - encryption: Option<(TlsConnector, ServerName<'static>)>, -} - -impl std::fmt::Debug for ConnectionInfo { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ConnectionInfo") - .field("user", &self.user) - .field("password", &"***") - .field("host", &self.host) - .field("port", &self.port) - .field("routing", &self.routing) - .field("encryption", &self.encryption.is_some()) - .finish_non_exhaustive() - } -} - -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub(crate) enum Routing { No, Yes(Vec<(BoltString, BoltString)>), @@ -276,6 +277,44 @@ impl From for Option { } } +impl Display for Routing { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Routing::No => f.write_str(""), + Routing::Yes(routing) => { + let routing = routing + .iter() + .map(|(k, v)| format!("{}: \"{}\"", k, v)) + .collect::>() + .join(", "); + write!(f, "{}", routing) + } + } + } +} + +pub(crate) struct ConnectionInfo { + pub user: Arc, + pub password: Arc, + pub host: Host>, + pub port: u16, + pub routing: Routing, + pub encryption: Option<(TlsConnector, ServerName<'static>)>, +} + +impl Debug for ConnectionInfo { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ConnectionInfo") + .field("user", &self.user) + .field("password", &"***") + .field("host", &self.host) + .field("port", &self.port) + .field("routing", &self.routing) + .field("encryption", &self.encryption.is_some()) + .finish_non_exhaustive() + } +} + impl ConnectionInfo { pub(crate) fn new( uri: &str, @@ -300,8 +339,8 @@ impl ConnectionInfo { .transpose()?; let routing = if routing { - log::warn!(concat!( - "This driver does not yet implement client-side routing. ", + warn!(concat!( + "Client-side routing is in experimental mode.", "It is possible that operations against a cluster (such as Aura) will fail." )); Routing::Yes(url.routing_context()) @@ -386,23 +425,24 @@ impl ConnectionInfo { #[cfg(feature = "unstable-bolt-protocol-impl-v2")] pub(crate) fn to_hello(&self, version: Version) -> Hello { - let routing = match self.routing { - Routing::No => &[], - Routing::Yes(ref routing) => routing.as_slice(), - }; - let routing = routing - .iter() - .map(|(k, v)| (k.value.as_str(), v.value.as_str())); - HelloBuilder::new(&self.user, &self.password) - .with_routing(routing) - .build(version) + match self.routing { + Routing::No => HelloBuilder::new(&self.user, &self.password).build(version), + Routing::Yes(ref routing) => HelloBuilder::new(&self.user, &self.password) + .with_routing( + routing + .iter() + .map(|(k, v)| (k.value.as_str(), v.value.as_str())), + ) + .build(version), + } } } -struct NeoUrl(Url); +#[derive(Clone, Debug)] +pub struct NeoUrl(Url); impl NeoUrl { - fn parse(uri: &str) -> Result { + pub(crate) fn parse(uri: &str) -> Result { let url = match Url::parse(uri) { Ok(url) if url.has_host() => url, // missing scheme @@ -415,46 +455,55 @@ impl NeoUrl { Ok(Self(url)) } - fn scheme(&self) -> &str { + pub(crate) fn scheme(&self) -> &str { self.0.scheme() } - fn host(&self) -> Host<&str> { + pub(crate) fn host(&self) -> Host<&str> { self.0.host().unwrap() } - fn port(&self) -> u16 { + pub(crate) fn port(&self) -> u16 { self.0.port().unwrap_or(7687) } fn routing_context(&mut self) -> Vec<(BoltString, BoltString)> { - Vec::new() + vec![( + "address".into(), + format!("{}:{}", self.0.host().unwrap(), self.port()).into(), + )] } fn warn_on_unexpected_components(&self) { if !self.0.username().is_empty() || self.0.password().is_some() { - log::warn!(concat!( + warn!(concat!( "URI contained auth credentials, which are ignored.", "Credentials are passed outside of the URI" )); } if !matches!(self.0.path(), "" | "/") { - log::warn!("URI contained a path, which is ignored."); + warn!("URI contained a path, which is ignored."); } if self.0.query().is_some() { - log::warn!(concat!( + warn!(concat!( "This client does not yet support client-side routing.", "The routing context passed as a query to the URI is ignored." )); } if self.0.fragment().is_some() { - log::warn!("URI contained a fragment, which is ignored."); + warn!("URI contained a fragment, which is ignored."); } } } +impl Display for NeoUrl { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + mod stream { use pin_project_lite::pin_project; use tokio::{ diff --git a/lib/src/errors.rs b/lib/src/errors.rs index 6efd65f1..10c9a656 100644 --- a/lib/src/errors.rs +++ b/lib/src/errors.rs @@ -93,6 +93,18 @@ pub enum Error { #[error("{0}")] DeserializationError(DeError), + + #[error("Failed to fetch the routing table [{}]: {}", _0.0, _0.1)] + RoutingTableError((String, String)), + + #[error("The request has been ignored by the server. This can happen if the server is under pressure or there was an issue with the memory.")] + RequestIgnoredError, + + #[error("{0}")] + RoutingTableRefreshFailed(String), + + #[error("{0}")] + ServerUnavailableError(String), } #[derive(Copy, Clone, Debug, PartialEq, Eq)] diff --git a/lib/src/graph.rs b/lib/src/graph.rs index 1f590608..f950f4a7 100644 --- a/lib/src/graph.rs +++ b/lib/src/graph.rs @@ -1,5 +1,12 @@ -use std::time::Duration; +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +use { + crate::connection::{ConnectionInfo, Routing}, + crate::graph::ConnectionPoolManager::Routed, + crate::routing::RoutedConnectionManager, +}; +use crate::graph::ConnectionPoolManager::Direct; +use crate::pool::ManagedConnection; use crate::{ config::{Config, ConfigBuilder, Database, LiveConfig}, errors::Result, @@ -7,7 +14,36 @@ use crate::{ query::Query, stream::DetachedRowStream, txn::Txn, + Operation, }; +use backoff::{Error, ExponentialBackoff}; +use std::time::Duration; + +#[derive(Clone)] +enum ConnectionPoolManager { + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + Routed(RoutedConnectionManager), + Direct(ConnectionPool), +} + +impl ConnectionPoolManager { + #[allow(unused_variables)] + async fn get(&self, operation: Option) -> Result { + match self { + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + Routed(manager) => manager.get(operation).await, + Direct(pool) => pool.get().await.map_err(crate::Error::from), + } + } + + fn backoff(&self) -> ExponentialBackoff { + match self { + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + Routed(manager) => manager.backoff(), + Direct(pool) => pool.manager().backoff(), + } + } +} /// A neo4j database abstraction. /// This type can be cloned and shared across threads, internal resources @@ -15,7 +51,7 @@ use crate::{ #[derive(Clone)] pub struct Graph { config: LiveConfig, - pool: ConnectionPool, + pool: ConnectionPoolManager, } /// Returns a [`Query`] which provides methods like [`Query::param`] to add parameters to the query @@ -28,9 +64,36 @@ impl Graph { /// /// You can build a config using [`ConfigBuilder::default()`]. pub async fn connect(config: Config) -> Result { - let pool = create_pool(&config).await?; - let config = config.into_live_config(); - Ok(Graph { config, pool }) + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + { + let info = ConnectionInfo::new( + &config.uri, + &config.user, + &config.password, + &config.tls_config, + )?; + if matches!(info.routing, Routing::Yes(_)) { + let pool = Routed(RoutedConnectionManager::new(&config).await?); + Ok(Graph { + config: config.into_live_config(), + pool, + }) + } else { + let pool = Direct(create_pool(&config).await?); + Ok(Graph { + config: config.into_live_config(), + pool, + }) + } + } + #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] + { + let pool = Direct(create_pool(&config).await?); + Ok(Graph { + config: config.into_live_config(), + pool, + }) + } } /// Connects to the database with default configurations @@ -53,7 +116,19 @@ impl Graph { /// /// Transactions will not be automatically retried on any failure. pub async fn start_txn(&self) -> Result { - self.impl_start_txn_on(self.config.db.clone()).await + self.impl_start_txn_on(self.config.db.clone(), Operation::Write) + .await + } + + /// Starts a new transaction on the configured database specifying the desired operation. + /// All queries that needs to be run/executed within the transaction + /// should be executed using either [`Txn::run`] or [`Txn::execute`] + /// + /// Transactions will not be automatically retried on any failure. + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + pub async fn start_txn_as(&self, operation: Operation) -> Result { + self.impl_start_txn_on(self.config.db.clone(), operation) + .await } /// Starts a new transaction on the provided database. @@ -62,12 +137,14 @@ impl Graph { /// /// Transactions will not be automatically retried on any failure. pub async fn start_txn_on(&self, db: impl Into) -> Result { - self.impl_start_txn_on(Some(db.into())).await + self.impl_start_txn_on(Some(db.into()), Operation::Write) + .await } - async fn impl_start_txn_on(&self, db: Option) -> Result { - let connection = self.pool.get().await?; - Txn::new(db, self.config.fetch_size, connection).await + #[allow(unused_variables)] + async fn impl_start_txn_on(&self, db: Option, operation: Operation) -> Result { + let connection = self.pool.get(Some(operation.clone())).await?; + Txn::new(db, self.config.fetch_size, connection, operation).await } /// Runs a query on the configured database using a connection from the connection pool, @@ -82,7 +159,8 @@ impl Graph { /// /// use [`Graph::execute`] when you are interested in the result stream pub async fn run(&self, q: Query) -> Result<()> { - self.impl_run_on(self.config.db.clone(), q).await + self.impl_run_on(self.config.db.clone(), q, Operation::Write) + .await } /// Runs a query on the provided database using a connection from the connection pool. @@ -96,20 +174,48 @@ impl Graph { /// Use [`Graph::run`] for cases where you just want a write operation /// /// use [`Graph::execute`] when you are interested in the result stream + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + pub async fn run_on( + &self, + db: impl Into, + q: Query, + operation: Operation, + ) -> Result<()> { + self.impl_run_on(Some(db.into()), q, operation).await + } + + #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] pub async fn run_on(&self, db: impl Into, q: Query) -> Result<()> { - self.impl_run_on(Some(db.into()), q).await + self.impl_run_on(Some(db.into()), q, Operation::Write).await } - async fn impl_run_on(&self, db: Option, q: Query) -> Result<()> { + #[allow(unused_variables)] + async fn impl_run_on( + &self, + db: Option, + q: Query, + operation: Operation, + ) -> Result<()> { backoff::future::retry_notify( - self.pool.manager().backoff(), + self.pool.backoff(), || { let pool = &self.pool; - let query = &q; - let db = db.as_deref(); + let mut query = q.clone(); + let operation = operation.clone(); + if let Some(db) = db.as_deref() { + query = query.extra("db", db); + } + query = query.extra( + "mode", + match operation { + Operation::Read => "r", + Operation::Write => "w", + }, + ); async move { - let mut connection = pool.get().await.map_err(crate::Error::from)?; - query.run_retryable(db, &mut connection).await + let mut connection = + pool.get(Some(operation)).await.map_err(Error::Permanent)?; // an error when retrieving a connection is considered permanent + query.run_retryable(&mut connection).await } }, Self::log_retry, @@ -117,14 +223,26 @@ impl Graph { .await } - /// Executes a query on the configured database and returns a [`DetachedRowStream`] + /// Executes a READ/WRITE query on the configured database and returns a [`DetachedRowStream`] /// /// This operation retires the query on certain failures. /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. pub async fn execute(&self, q: Query) -> Result { - self.impl_execute_on(self.config.db.clone(), q).await + self.impl_execute_on(self.config.db.clone(), q, Operation::Write) + .await + } + + /// Executes a query READ on the configured database and returns a [`DetachedRowStream`] + /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. + pub async fn execute_read(&self, q: Query) -> Result { + self.impl_execute_on(self.config.db.clone(), q, Operation::Read) + .await } /// Executes a query on the provided database and returns a [`DetachedRowStream`] @@ -133,21 +251,55 @@ impl Graph { /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. /// Retries happen with an exponential backoff until a retry delay exceeds 60s, at which point the query fails with the last error as it would without any retry. + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + pub async fn execute_on( + &self, + db: impl Into, + q: Query, + operation: Operation, + ) -> Result { + self.impl_execute_on(Some(db.into()), q, operation).await + } + + /// Executes a query on the provided database and returns a [`DetachedRowStream`] + /// + /// This operation retires the query on certain failures. + /// All errors with the `Transient` error class as well as a few other error classes are considered retryable. + /// This includes errors during a leader election or when the transaction resources on the server (memory, handles, ...) are exhausted. + #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] pub async fn execute_on(&self, db: impl Into, q: Query) -> Result { - self.impl_execute_on(Some(db.into()), q).await + self.impl_execute_on(Some(db.into()), q, Operation::Write) + .await } - async fn impl_execute_on(&self, db: Option, q: Query) -> Result { + #[allow(unused_variables)] + async fn impl_execute_on( + &self, + db: Option, + q: Query, + operation: Operation, + ) -> Result { backoff::future::retry_notify( - self.pool.manager().backoff(), + self.pool.backoff(), || { let pool = &self.pool; + let mut query = q.clone(); + let operation = operation.clone(); let fetch_size = self.config.fetch_size; - let query = &q; - let db = db.as_deref(); + if let Some(db) = db.as_deref() { + query = query.extra("db", db); + } + let operation = operation.clone(); + query = query.param( + "mode", + match operation { + Operation::Read => "r", + Operation::Write => "w", + }, + ); async move { - let connection = pool.get().await.map_err(crate::Error::from)?; - query.execute_retryable(db, fetch_size, connection).await + let connection = pool.get(Some(operation)).await.map_err(Error::Permanent)?; // an error when retrieving a connection is considered permanent + query.execute_retryable(fetch_size, connection).await } }, Self::log_retry, diff --git a/lib/src/lib.rs b/lib/src/lib.rs index b62b21de..3d911042 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -465,6 +465,8 @@ mod messages; mod packstream; mod pool; mod query; +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +mod routing; mod row; mod stream; #[cfg(feature = "unstable-result-summary")] @@ -493,5 +495,21 @@ pub use crate::types::{ BoltPoint2D, BoltPoint3D, BoltRelation, BoltString, BoltTime, BoltType, BoltUnboundedRelation, }; pub use crate::version::Version; +use std::fmt::Display; pub(crate) use messages::Success; + +#[derive(Debug, PartialEq, Clone)] +pub enum Operation { + Read, + Write, +} + +impl Display for Operation { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Operation::Read => write!(f, "READ"), + Operation::Write => write!(f, "WRITE"), + } + } +} diff --git a/lib/src/messages.rs b/lib/src/messages.rs index af7e8d6e..a47eaade 100644 --- a/lib/src/messages.rs +++ b/lib/src/messages.rs @@ -12,7 +12,6 @@ mod rollback; mod run; mod success; -use crate::messages::ignore::Ignore; use crate::{ errors::{Error, Result}, types::{BoltMap, BoltWireFormat}, @@ -22,6 +21,7 @@ use crate::{ use begin::Begin; use bytes::Bytes; use failure::Failure; +use ignore::Ignore; use record::Record; use run::Run; pub(crate) use success::Success; @@ -135,8 +135,8 @@ impl BoltRequest { BoltRequest::Hello(hello::Hello::new(data)) } - pub fn run(db: Option<&str>, query: &str, params: BoltMap) -> BoltRequest { - BoltRequest::Run(Run::new(db.map(Into::into), query.into(), params)) + pub fn run(query: &str, params: BoltMap, extra: BoltMap) -> BoltRequest { + BoltRequest::Run(Run::new(query.into(), params, extra)) } #[cfg_attr( diff --git a/lib/src/messages/run.rs b/lib/src/messages/run.rs index 25152352..c8564e39 100644 --- a/lib/src/messages/run.rs +++ b/lib/src/messages/run.rs @@ -10,14 +10,11 @@ pub struct Run { } impl Run { - pub fn new(db: Option, query: BoltString, parameters: BoltMap) -> Run { + pub fn new(query: BoltString, parameters: BoltMap, extra: BoltMap) -> Run { Run { query, parameters, - extra: db - .into_iter() - .map(|db| ("db".into(), BoltType::String(db))) - .collect(), + extra, } } } @@ -32,9 +29,9 @@ mod tests { #[test] fn should_serialize_run() { let run = Run::new( - Some("test".into()), "query".into(), vec![("k".into(), "v".into())].into_iter().collect(), + vec![("db".into(), "test".into())].into_iter().collect(), ); let bytes: Bytes = run.into_bytes(Version::V4_1).unwrap(); @@ -70,7 +67,7 @@ mod tests { #[test] fn should_serialize_run_with_no_params() { - let run = Run::new(None, "query".into(), BoltMap::default()); + let run = Run::new("query".into(), BoltMap::default(), BoltMap::default()); let bytes: Bytes = run.into_bytes(Version::V4_1).unwrap(); diff --git a/lib/src/packstream/ser/mod.rs b/lib/src/packstream/ser/mod.rs index 6cc4d77f..c5e362ce 100644 --- a/lib/src/packstream/ser/mod.rs +++ b/lib/src/packstream/ser/mod.rs @@ -539,6 +539,12 @@ impl<'a> ser::SerializeStructVariant for &'a mut Serializer { value.serialize(&mut **self) } + fn skip_field(&mut self, _key: &'static str) -> Result<(), Self::Error> { + self.bytes.reserve(1); + self.bytes.put_u8(0xC0); + Ok(()) + } + fn end(self) -> Result { Ok(()) } diff --git a/lib/src/pool.rs b/lib/src/pool.rs index 9e8652b4..f9ba21d9 100644 --- a/lib/src/pool.rs +++ b/lib/src/pool.rs @@ -8,7 +8,7 @@ use crate::{ }; use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; use deadpool::managed::{Manager, Metrics, Object, Pool, RecycleResult}; -use log::info; +use log::{info, trace}; pub type ConnectionPool = Pool; pub type ManagedConnection = Object; @@ -45,11 +45,12 @@ impl Manager for ConnectionManager { type Error = Error; async fn create(&self) -> Result { - info!("creating new connection..."); + trace!("creating new connection"); Connection::new(&self.info).await } async fn recycle(&self, obj: &mut Self::Type, _: &Metrics) -> RecycleResult { + trace!("recycling connection"); Ok(obj.reset().await?) } } diff --git a/lib/src/query.rs b/lib/src/query.rs index e5a75682..fa5a99c6 100644 --- a/lib/src/query.rs +++ b/lib/src/query.rs @@ -14,6 +14,7 @@ use crate::{ pub struct Query { query: String, params: BoltMap, + extra: BoltMap, } impl Query { @@ -21,6 +22,7 @@ impl Query { Query { query, params: BoltMap::default(), + extra: BoltMap::default(), } } @@ -29,6 +31,11 @@ impl Query { self } + pub fn extra>(mut self, key: &str, value: T) -> Self { + self.extra.put(key.into(), value.into()); + self + } + pub fn params(mut self, input_params: impl IntoIterator) -> Self where K: Into, @@ -41,16 +48,28 @@ impl Query { self } + pub fn extras(mut self, input_params: impl IntoIterator) -> Self + where + K: Into, + V: Into, + { + for (key, value) in input_params { + self.extra.put(key.into(), value.into()); + } + + self + } + pub fn has_param_key(&self, key: &str) -> bool { self.params.value.contains_key(key) } - pub(crate) async fn run( - self, - db: Option<&str>, - connection: &mut ManagedConnection, - ) -> Result<()> { - let request = BoltRequest::run(db, &self.query, self.params); + pub fn has_extra_key(&self, key: &str) -> bool { + self.extra.value.contains_key(key) + } + + pub(crate) async fn run(self, connection: &mut ManagedConnection) -> Result<()> { + let request = BoltRequest::run(&self.query, self.params, self.extra); Self::try_run(request, connection) .await .map_err(unwrap_backoff) @@ -58,32 +77,29 @@ impl Query { pub(crate) async fn run_retryable( &self, - db: Option<&str>, connection: &mut ManagedConnection, ) -> QueryResult<()> { - let request = BoltRequest::run(db, &self.query, self.params.clone()); + let request = BoltRequest::run(&self.query, self.params.clone(), self.extra.clone()); Self::try_run(request, connection).await } pub(crate) async fn execute_retryable( &self, - db: Option<&str>, fetch_size: usize, mut connection: ManagedConnection, ) -> QueryResult { - let request = BoltRequest::run(db, &self.query, self.params.clone()); + let request = BoltRequest::run(&self.query, self.params.clone(), self.extra.clone()); Self::try_execute(request, fetch_size, &mut connection) .await .map(|stream| DetachedRowStream::new(stream, connection)) } - pub(crate) async fn execute_mut<'conn>( + pub(crate) async fn execute_mut( self, - db: Option<&str>, fetch_size: usize, - connection: &'conn mut ManagedConnection, + connection: &mut ManagedConnection, ) -> Result { - let run = BoltRequest::run(db, &self.query, self.params); + let run = BoltRequest::run(&self.query, self.params, self.extra); Self::try_execute(run, fetch_size, connection) .await .map_err(unwrap_backoff) diff --git a/lib/src/routing/connection_registry.rs b/lib/src/routing/connection_registry.rs new file mode 100644 index 00000000..6d654cb3 --- /dev/null +++ b/lib/src/routing/connection_registry.rs @@ -0,0 +1,206 @@ +use crate::connection::NeoUrl; +use crate::pool::{create_pool, ConnectionPool}; +use crate::routing::{RoutingTable, Server}; +use crate::{Config, Error}; +use dashmap::DashMap; +use futures::lock::Mutex; +use log::debug; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) struct BoltServer { + pub(crate) address: String, + pub(crate) port: u16, + pub(crate) role: String, +} + +impl BoltServer { + pub(crate) fn resolve(server: &Server) -> Vec { + server + .addresses + .iter() + .map(|address| { + let bs = NeoUrl::parse(address) + .map(|addr| BoltServer { + address: addr.host().to_string(), + port: addr.port(), + role: server.role.to_string(), + }) + .unwrap_or_else(|_| panic!("Failed to parse address {}", address)); + debug!("Resolved server: {:?}", bs); + bs + }) + .collect() + } +} + +pub type Registry = DashMap; + +#[derive(Clone)] +pub(crate) struct ConnectionRegistry { + config: Config, + creation_time: Arc>, + ttl: Arc, + pub(crate) connections: Registry, +} + +impl ConnectionRegistry { + pub(crate) fn new(config: &Config) -> Self { + ConnectionRegistry { + config: config.clone(), + creation_time: Arc::new(Mutex::new(Instant::now())), + ttl: Arc::new(AtomicU64::new(0)), + connections: DashMap::new(), + } + } + + pub(crate) async fn update_if_expired(&self, f: F) -> Result<(), Error> + where + F: FnOnce() -> R, + R: std::future::Future>, + { + let now = Instant::now(); + debug!("Checking if routing table is expired..."); + let mut guard = self.creation_time.lock().await; + if self.connections.is_empty() + || now.duration_since(*guard).as_secs() > self.ttl.load(Ordering::Relaxed) + { + debug!("Routing table expired or empty, refreshing..."); + let routing_table = f().await?; + debug!("Routing table refreshed: {:?}", routing_table); + let registry = &self.connections; + let servers = routing_table.resolve(); + let url = NeoUrl::parse(self.config.uri.as_str())?; + // Convert neo4j scheme to bolt scheme to create connection pools. + // We need to use the bolt scheme since we don't want new connections to be routed + let scheme = match url.scheme() { + "neo4j" => "bolt", + "neo4j+s" => "bolt+s", + "neo4j+ssc" => "bolt+ssc", + _ => return Err(Error::UnsupportedScheme(url.scheme().to_string())), + }; + + for server in servers.iter() { + if registry.contains_key(server) { + continue; + } + let uri = format!("{}://{}:{}", scheme, server.address, server.port); + debug!("Creating pool for server: {}", uri); + registry.insert( + server.clone(), + create_pool(&Config { + uri, + ..self.config.clone() + }) + .await?, + ); + } + registry.retain(|k, _| servers.contains(k)); + let _ = self + .ttl + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |_ttl| { + Some(routing_table.ttl) + }) + .unwrap(); + debug!( + "Registry updated. New size is {} with TTL {}s", + registry.len(), + routing_table.ttl + ); + *guard = now; + } + Ok(()) + } + /// Retrieve the pool for a specific server. + pub fn get_pool(&self, server: &BoltServer) -> Option { + self.connections.get(server).map(|entry| entry.clone()) + } + + pub fn mark_unavailable(&self, server: &BoltServer) { + self.connections.remove(server); + } + + pub fn servers(&self) -> Vec { + self.connections + .iter() + .map(|entry| entry.key().clone()) + .collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::auth::ConnectionTLSConfig; + use crate::routing::load_balancing::LoadBalancingStrategy; + use crate::routing::RoundRobinStrategy; + use crate::routing::Server; + + #[tokio::test] + async fn test_available_servers() { + let readers = vec![ + Server { + addresses: vec!["host1:7687".to_string()], + role: "READ".to_string(), + }, + Server { + addresses: vec!["host2:7688".to_string()], + role: "READ".to_string(), + }, + ]; + let writers = vec![ + Server { + addresses: vec!["host3:7687".to_string()], + role: "WRITE".to_string(), + }, + Server { + addresses: vec!["host4:7688".to_string()], + role: "WRITE".to_string(), + }, + ]; + let routers = vec![Server { + addresses: vec!["host0:7687".to_string()], + role: "ROUTE".to_string(), + }]; + let cluster_routing_table = RoutingTable { + ttl: 0, + db: None, + servers: readers + .clone() + .into_iter() + .chain(writers.clone()) + .chain(routers.clone()) + .collect(), + }; + let config = Config { + uri: "neo4j://localhost:7687".to_string(), + user: "user".to_string(), + password: "password".to_string(), + max_connections: 10, + db: Some("neo4j".into()), + fetch_size: 0, + tls_config: ConnectionTLSConfig::None, + }; + let registry = ConnectionRegistry::new(&config); + registry + .update_if_expired(|| async { Ok(cluster_routing_table) }) + .await + .unwrap(); + assert_eq!(registry.connections.len(), 5); + let strategy = RoundRobinStrategy::default(); + registry.mark_unavailable(BoltServer::resolve(&writers[0]).first().unwrap()); + assert_eq!(registry.connections.len(), 4); + let writer = strategy.select_writer(®istry.servers()).unwrap(); + assert_eq!( + format!("{}:{}", writer.address, writer.port), + writers[1].addresses[0] + ); + + registry.mark_unavailable(BoltServer::resolve(&writers[1]).first().unwrap()); + assert_eq!(registry.connections.len(), 3); + let writer = strategy.select_writer(®istry.servers()); + assert!(writer.is_none()); + } +} diff --git a/lib/src/routing/load_balancing/mod.rs b/lib/src/routing/load_balancing/mod.rs new file mode 100644 index 00000000..a63875b9 --- /dev/null +++ b/lib/src/routing/load_balancing/mod.rs @@ -0,0 +1,8 @@ +pub(crate) mod round_robin_strategy; + +use crate::routing::connection_registry::BoltServer; + +pub trait LoadBalancingStrategy: Sync + Send { + fn select_reader(&self, servers: &[BoltServer]) -> Option; + fn select_writer(&self, servers: &[BoltServer]) -> Option; +} diff --git a/lib/src/routing/load_balancing/round_robin_strategy.rs b/lib/src/routing/load_balancing/round_robin_strategy.rs new file mode 100644 index 00000000..b7a0ade8 --- /dev/null +++ b/lib/src/routing/load_balancing/round_robin_strategy.rs @@ -0,0 +1,112 @@ +use crate::routing::connection_registry::BoltServer; +use crate::routing::load_balancing::LoadBalancingStrategy; +use std::sync::atomic::AtomicUsize; + +#[derive(Default)] +pub struct RoundRobinStrategy { + reader_index: AtomicUsize, + writer_index: AtomicUsize, +} + +impl RoundRobinStrategy { + fn select(servers: &[BoltServer], index: &AtomicUsize) -> Option { + if servers.is_empty() { + return None; + } + + let _ = index.compare_exchange( + 0, + servers.len(), + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + ); + let i = index.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); + if let Some(server) = servers.get(i - 1) { + Some(server.clone()) + } else { + //reset index + index.store(servers.len(), std::sync::atomic::Ordering::Relaxed); + servers.last().cloned() + } + } +} + +impl LoadBalancingStrategy for RoundRobinStrategy { + fn select_reader(&self, servers: &[BoltServer]) -> Option { + let readers = servers + .iter() + .filter(|s| s.role == "READ") + .cloned() + .collect::>(); + Self::select(&readers, &self.reader_index) + } + + fn select_writer(&self, servers: &[BoltServer]) -> Option { + let writers = servers + .iter() + .filter(|s| s.role == "WRITE") + .cloned() + .collect::>(); + Self::select(&writers, &self.writer_index) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::routing::{RoutingTable, Server}; + + #[test] + fn should_get_next_server() { + let routers = vec![Server { + addresses: vec!["192.168.0.1:7688".to_string()], + role: "WRITE".to_string(), + }]; + let readers = vec![Server { + addresses: vec![ + "192.168.0.2:7687".to_string(), + "192.168.0.3:7687".to_string(), + ], + role: "READ".to_string(), + }]; + let writers = vec![Server { + addresses: vec!["192.168.0.4:7688".to_string()], + role: "WRITE".to_string(), + }]; + + let cluster_routing_table = RoutingTable { + ttl: 300, + db: Some("neo4j".into()), + servers: routers + .clone() + .into_iter() + .chain(readers.clone()) + .chain(writers.clone()) + .collect(), + }; + let all_servers = cluster_routing_table.resolve(); + assert_eq!(all_servers.len(), 4); + let strategy = RoundRobinStrategy::default(); + + let reader = strategy.select_reader(&all_servers).unwrap(); + assert_eq!( + format!("{}:{}", reader.address, reader.port), + readers[0].addresses[1] + ); + let reader = strategy.select_reader(&all_servers).unwrap(); + assert_eq!( + format!("{}:{}", reader.address, reader.port), + readers[0].addresses[0] + ); + let reader = strategy.select_reader(&all_servers).unwrap(); + assert_eq!( + format!("{}:{}", reader.address, reader.port), + readers[0].addresses[1] + ); + let writer = strategy.select_writer(&all_servers).unwrap(); + assert_eq!( + format!("{}:{}", writer.address, writer.port), + writers[0].addresses[0] + ); + } +} diff --git a/lib/src/routing/mod.rs b/lib/src/routing/mod.rs new file mode 100644 index 00000000..89e92302 --- /dev/null +++ b/lib/src/routing/mod.rs @@ -0,0 +1,165 @@ +mod connection_registry; +mod load_balancing; +mod routed_connection_manager; +use std::fmt::{Display, Formatter}; +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +use {crate::connection::Routing, serde::Deserialize}; + +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RouteExtra { + V4_3(Option), + V4_4(Extra), +} + +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Route<'a> { + pub(crate) routing: Routing, + pub(crate) bookmarks: Vec<&'a str>, + pub(crate) extra: RouteExtra, +} + +// NOTE: this structure will be needed in the future when we implement the Bolt protocol v4.4 +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +#[allow(dead_code)] +pub struct Extra { + pub(crate) db: Option, + pub(crate) imp_user: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +#[cfg_attr(feature = "unstable-bolt-protocol-impl-v2", derive(Deserialize))] +pub struct RoutingTable { + pub(crate) ttl: u64, + pub(crate) db: Option, + pub(crate) servers: Vec, +} + +impl RoutingTable { + pub(crate) fn resolve(&self) -> Vec { + self.servers + .iter() + .flat_map(BoltServer::resolve) + .collect::>() + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "unstable-bolt-protocol-impl-v2", derive(Deserialize))] +pub struct Server { + pub(crate) addresses: Vec, + pub(crate) role: String, // TODO: use an enum here +} + +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +pub struct RouteBuilder<'a> { + routing: Routing, + bookmarks: Vec<&'a str>, + db: Option, + imp_user: Option, +} + +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +impl<'a> RouteBuilder<'a> { + pub fn new(routing: Routing, bookmarks: Vec<&'a str>) -> Self { + Self { + routing, + bookmarks, + db: None, + imp_user: None, + } + } + + pub fn with_db(self, db: Database) -> Self { + Self { + db: Some(db), + ..self + } + } + + #[allow(dead_code)] + pub fn with_imp_user(self, imp_user: &'a str) -> Self { + Self { + imp_user: Some(imp_user.to_string()), + ..self + } + } + + pub fn build(self, version: Version) -> Route<'a> { + match version.cmp(&Version::V4_4) { + std::cmp::Ordering::Equal | std::cmp::Ordering::Greater => Route { + routing: self.routing, + bookmarks: self.bookmarks, + extra: RouteExtra::V4_4(Extra { + db: self.db, + imp_user: self.imp_user, + }), + }, + std::cmp::Ordering::Less => Route { + routing: self.routing, + bookmarks: self.bookmarks, + extra: RouteExtra::V4_3(self.db), + }, + } + } +} + +impl Display for Route<'_> { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let (db, imp_user) = match self.extra { + RouteExtra::V4_3(ref db) => { + let db = db + .clone() + .map(|d| d.to_string()) + .unwrap_or("null".to_string()); + let imp_user = "null".to_string(); + (db, imp_user) + } + RouteExtra::V4_4(ref extra) => { + let db = extra + .db + .clone() + .map(|d| d.to_string()) + .unwrap_or("null".to_string()); + let imp_user = extra.imp_user.clone().unwrap_or("null".to_string()); + (db, imp_user) + } + }; + + write!( + f, + "ROUTE {{ {} }} [{}] {} {}", + self.routing, + self.bookmarks + .iter() + .map(|b| b.to_string()) + .collect::>() + .join(", "), + db, + imp_user + ) + } +} + +impl Display for RoutingTable { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "RoutingTable {{ ttl: {}, db: {:?}, servers: {} }}", + self.ttl, + self.db.clone(), + self.servers + .iter() + .map(|s| s.addresses.join(", ")) + .collect::>() + .join(", ") + ) + } +} + +use crate::routing::connection_registry::BoltServer; +use crate::{Database, Version}; +pub use load_balancing::round_robin_strategy::RoundRobinStrategy; +pub use routed_connection_manager::RoutedConnectionManager; diff --git a/lib/src/routing/routed_connection_manager.rs b/lib/src/routing/routed_connection_manager.rs new file mode 100644 index 00000000..46174f10 --- /dev/null +++ b/lib/src/routing/routed_connection_manager.rs @@ -0,0 +1,123 @@ +use crate::connection::{Connection, ConnectionInfo}; +use crate::pool::ManagedConnection; +use crate::routing::connection_registry::{BoltServer, ConnectionRegistry}; +use crate::routing::load_balancing::LoadBalancingStrategy; +use crate::routing::RoundRobinStrategy; +#[cfg(feature = "unstable-bolt-protocol-impl-v2")] +use crate::routing::{RouteBuilder, RoutingTable}; +use crate::{Config, Error, Operation}; +use backoff::{ExponentialBackoff, ExponentialBackoffBuilder}; +use futures::lock::Mutex; +use log::{debug, error}; +use std::sync::Arc; +use std::time::Duration; + +#[derive(Clone)] +pub struct RoutedConnectionManager { + load_balancing_strategy: Arc, + connection_registry: Arc, + #[allow(dead_code)] + bookmarks: Arc>>, + backoff: Arc, + config: Config, +} + +impl RoutedConnectionManager { + pub async fn new(config: &Config) -> Result { + let registry = Arc::new(ConnectionRegistry::new(config)); + let backoff = Arc::new( + ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(1)) + .with_randomization_factor(0.42) + .with_multiplier(2.0) + .with_max_elapsed_time(Some(Duration::from_secs(60))) + .build(), + ); + + Ok(RoutedConnectionManager { + load_balancing_strategy: Arc::new(RoundRobinStrategy::default()), + connection_registry: registry, + bookmarks: Arc::new(Mutex::new(vec![])), + backoff, + config: config.clone(), + }) + } + + pub async fn refresh_routing_table(&self) -> Result { + let info = ConnectionInfo::new( + &self.config.uri, + &self.config.user, + &self.config.password, + &self.config.tls_config, + )?; + let mut connection = Connection::new(&info).await?; + let mut builder = RouteBuilder::new(info.routing, vec![]); + if let Some(db) = self.config.db.clone() { + builder = builder.with_db(db); + } + let rt = connection + .route(builder.build(connection.version())) + .await?; + debug!("Fetched a new routing table: {:?}", rt); + Ok(rt) + } + + pub(crate) async fn get( + &self, + operation: Option, + ) -> Result { + // We probably need to do this in a more efficient way, since this will block the request of a connection + // while we refresh the routing table. We should probably have a separate thread that refreshes the routing + self.connection_registry + .update_if_expired(|| self.refresh_routing_table()) + .await?; + + let op = operation.unwrap_or(Operation::Write); + while let Some(server) = match op { + Operation::Write => self.select_writer(), + _ => self.select_reader(), + } { + debug!("requesting connection for server: {:?}", server); + if let Some(pool) = self.connection_registry.get_pool(&server) { + match pool.get().await { + Ok(connection) => return Ok(connection), + Err(e) => { + error!( + "Failed to get connection from pool for server `{}`: {}", + server.address, e + ); + self.connection_registry.mark_unavailable(&server); + continue; + } + } + } else { + // We couldn't find a connection manager for the server, it was probably marked unavailable + error!( + "No connection manager available for router `{}` in the registry", + server.address + ); + return Err(Error::ServerUnavailableError(format!( + "No connection manager available for router `{}` in the registry", + server.address + ))); + } + } + Err(Error::ServerUnavailableError(format!( + "No server available for {op} operation" + ))) + } + + pub(crate) fn backoff(&self) -> ExponentialBackoff { + self.backoff.as_ref().clone() + } + + fn select_reader(&self) -> Option { + self.load_balancing_strategy + .select_reader(&self.connection_registry.servers()) + } + + fn select_writer(&self) -> Option { + self.load_balancing_strategy + .select_writer(&self.connection_registry.servers()) + } +} diff --git a/lib/src/txn.rs b/lib/src/txn.rs index f82c9579..7b3e10ea 100644 --- a/lib/src/txn.rs +++ b/lib/src/txn.rs @@ -7,6 +7,7 @@ use crate::{ pool::ManagedConnection, query::Query, stream::RowStream, + Operation, }; /// A handle which is used to control a transaction, created as a result of [`crate::Graph::start_txn`] @@ -17,6 +18,7 @@ pub struct Txn { db: Option, fetch_size: usize, connection: ManagedConnection, + operation: Operation, } impl Txn { @@ -24,6 +26,7 @@ impl Txn { db: Option, fetch_size: usize, mut connection: ManagedConnection, + operation: Operation, ) -> Result { let begin = BoltRequest::begin(db.as_deref()); match connection.send_recv(begin).await? { @@ -31,6 +34,7 @@ impl Txn { db, fetch_size, connection, + operation, }), msg => Err(msg.into_error("BEGIN")), } @@ -49,12 +53,35 @@ impl Txn { /// Runs a single query and discards the stream. pub async fn run(&mut self, q: Query) -> Result<()> { - q.run(self.db.as_deref(), &mut self.connection).await + let mut query = q.clone(); + if let Some(db) = self.db.as_ref() { + query = query.extra("db", db.to_string()); + } + query = query.extra( + "mode", + match self.operation { + Operation::Read => "r", + Operation::Write => "w", + }, + ); + query.run(&mut self.connection).await } /// Executes a query and returns a [`RowStream`] pub async fn execute(&mut self, q: Query) -> Result { - q.execute_mut(self.db.as_deref(), self.fetch_size, &mut self.connection) + let mut query = q.clone(); + if let Some(db) = self.db.as_ref() { + query = query.extra("db", db.to_string()); + } + query = query.extra( + "mode", + match self.operation { + Operation::Read => "r", + Operation::Write => "w", + }, + ); + query + .execute_mut(self.fetch_size, &mut self.connection) .await } diff --git a/lib/src/version.rs b/lib/src/version.rs index b6cca9b1..5e10cfb4 100644 --- a/lib/src/version.rs +++ b/lib/src/version.rs @@ -1,26 +1,30 @@ use crate::errors::{Error, Result}; use bytes::{BufMut, BytesMut}; use std::cmp::PartialEq; -use std::fmt::Debug; +use std::fmt::{Debug, Display, Formatter}; #[derive(Debug, PartialEq, Eq, Clone, Copy, PartialOrd, Ord)] #[non_exhaustive] pub enum Version { V4, V4_1, + V4_3, + V4_4, } impl Version { pub fn add_supported_versions(bytes: &mut BytesMut) { bytes.reserve(16); + bytes.put_u32(0x0404); // V4_4 + bytes.put_u32(0x0304); // V4_3 bytes.put_u32(0x0104); // V4_1 bytes.put_u32(0x0004); // V4 - bytes.put_u32(0); - bytes.put_u32(0); } pub fn parse(version_bytes: [u8; 4]) -> Result { match version_bytes { + [0, 0, 4, 4] => Ok(Version::V4_4), + [0, 0, 3, 4] => Ok(Version::V4_3), [0, 0, 1, 4] => Ok(Version::V4_1), [0, 0, 0, 4] => Ok(Version::V4), [0, 0, minor, major] => Err(Error::UnsupportedVersion(major, minor)), @@ -29,12 +33,25 @@ impl Version { } } +impl Display for Version { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + Version::V4 => write!(f, "4.0"), + Version::V4_1 => write!(f, "4.1"), + Version::V4_3 => write!(f, "4.3"), + Version::V4_4 => write!(f, "4.4"), + } + } +} + #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn should_parse_version() { + assert_eq!(Version::parse([0, 0, 4, 4]).unwrap(), Version::V4_4); + assert_eq!(Version::parse([0, 0, 3, 4]).unwrap(), Version::V4_3); assert_eq!(Version::parse([0, 0, 1, 4]).unwrap(), Version::V4_1); assert_eq!(Version::parse([0, 0, 0, 4]).unwrap(), Version::V4); } diff --git a/lib/tests/use_default_db.rs b/lib/tests/use_default_db.rs index 501efd67..dbc9fe49 100644 --- a/lib/tests/use_default_db.rs +++ b/lib/tests/use_default_db.rs @@ -25,9 +25,17 @@ async fn use_default_db() { }; let graph = neo4j.graph(); - let default_db = graph + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + let query_stream = graph + .execute_on("system", query("SHOW DEFAULT DATABASE"), Operation::Read) + .await; + + #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] + let query_stream = graph .execute_on("system", query("SHOW DEFAULT DATABASE")) - .await + .await; + + let default_db = query_stream .unwrap() .column_into_stream::("name") .try_fold(None::, |acc, db| async { Ok(acc.or(Some(db))) }) @@ -52,18 +60,30 @@ async fn use_default_db() { .await .unwrap(); - let count = graph + #[cfg(feature = "unstable-bolt-protocol-impl-v2")] + let query_stream = graph .execute_on( dbname.as_str(), query("MATCH (n:Node {uuid: $uuid}) RETURN count(n) AS result") .param("uuid", id.to_string()), + Operation::Read, ) - .await + .await; + + #[cfg(not(feature = "unstable-bolt-protocol-impl-v2"))] + let query_stream = graph + .execute_on( + dbname.as_str(), + query("MATCH (n:Node {uuid: $uuid}) RETURN count(n) AS result") + .param("uuid", id.to_string()), + ) + .await; + + let count = query_stream .unwrap() .column_into_stream::("result") .try_fold(0, |sum, count| async move { Ok(sum + count) }) .await .unwrap(); - assert_eq!(count, 1); }