From 6511564257658159c9245552c2a1a22ec73bef7c Mon Sep 17 00:00:00 2001 From: Thomas Zahner Date: Mon, 2 Mar 2026 11:28:41 +0100 Subject: [PATCH 1/4] Prevent duplicate requests to the same URLs This is done by locking a Mutex for each Uri. Previously duplicate Uris were sometimes checked, depending on the timing on when the other duplicates were cached. --- lychee-bin/tests/cli.rs | 28 +++++++++++++++++++++++++++ lychee-lib/src/ratelimit/host/host.rs | 25 ++++++++++++++++++------ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/lychee-bin/tests/cli.rs b/lychee-bin/tests/cli.rs index 4bde73105a..8c5bb497f5 100644 --- a/lychee-bin/tests/cli.rs +++ b/lychee-bin/tests/cli.rs @@ -1554,6 +1554,34 @@ The config file should contain every possible key for documentation purposes." Ok(()) } + #[tokio::test] + async fn test_no_duplicate_requests() { + let server = wiremock::MockServer::start().await; + let count = 10; // given 10 duplicate URLs + let cached = "90.0%"; // we expect 9 out of 10 to be cached + + wiremock::Mock::given(wiremock::matchers::method("GET")) + .respond_with(|_: &_| { + // Simulate real-world delay. + // Keep the delay to prove how we make use of synchronization + // primitives to prevent duplicate requests. + std::thread::sleep(std::time::Duration::from_secs(1)); + ResponseTemplate::new(200) + }) + .expect(1) + .mount(&server) + .await; + + cargo_bin_cmd!() + .write_stdin(format!("{} ", server.uri()).repeat(count)) + .arg("-") + .arg("--host-stats") + .assert() + .success() + .stdout(contains("100.0% success")) + .stdout(contains(format!("{cached} cached"))); + } + #[tokio::test] async fn test_process_internal_host_caching() -> Result<()> { // Note that this process-internal per-host caching diff --git a/lychee-lib/src/ratelimit/host/host.rs b/lychee-lib/src/ratelimit/host/host.rs index 0a27293a14..149648706a 100644 --- a/lychee-lib/src/ratelimit/host/host.rs +++ b/lychee-lib/src/ratelimit/host/host.rs @@ -12,8 +12,11 @@ use http::StatusCode; use humantime_serde::re::humantime::format_duration; use log::warn; use reqwest::{Client as ReqwestClient, Request, Response as ReqwestResponse}; -use std::time::{Duration, Instant}; use std::{num::NonZeroU32, sync::Mutex}; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use tokio::sync::Semaphore; use super::key::HostKey; @@ -63,6 +66,9 @@ pub struct Host { /// Per-host cache to prevent duplicate requests during a single link check invocation. /// Note that this cache has no direct relation to the inter-process persistable [`crate::CacheStatus`]. cache: HostCache, + + /// Keep track of currently active requests, to prevent duplicate concurrent requests + active_requests: DashMap>>, } impl Host { @@ -91,6 +97,7 @@ impl Host { stats: Mutex::new(HostStats::default()), backoff_duration: Mutex::new(Duration::from_millis(0)), cache: DashMap::new(), + active_requests: DashMap::new(), } } @@ -144,11 +151,8 @@ impl Host { let uri = Uri::from(url); let _permit = self.acquire_semaphore().await; - - if let Some(cached) = self.get_cached_status(&uri, needs_body) { - self.record_cache_hit(); - return Ok(cached); - } + let uri_mutex = self.acquire_uri_mutex(&uri); + let _uri_guard = uri_mutex.lock().await; self.await_backoff().await; @@ -209,6 +213,15 @@ impl Host { } } + /// Prevent concurrent requests to identical [`Uri`]s. + fn acquire_uri_mutex(&self, uri: &Uri) -> Arc> { + self.active_requests + .entry(uri.clone()) + .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) + .clone() + } + + /// Enforce the maximum concurrency of this host async fn acquire_semaphore(&self) -> tokio::sync::SemaphorePermit<'_> { self.semaphore .acquire() From d1a91eaffcc26cba57daff7ac58e96526171f919 Mon Sep 17 00:00:00 2001 From: Thomas Zahner Date: Mon, 2 Mar 2026 12:13:58 +0100 Subject: [PATCH 2/4] Check cache earlier This prevents backoff and rate limiting cache hits --- lychee-bin/tests/cli.rs | 6 ++++-- lychee-lib/src/ratelimit/host/host.rs | 10 +++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/lychee-bin/tests/cli.rs b/lychee-bin/tests/cli.rs index 8c5bb497f5..cb7a2cf656 100644 --- a/lychee-bin/tests/cli.rs +++ b/lychee-bin/tests/cli.rs @@ -1557,8 +1557,8 @@ The config file should contain every possible key for documentation purposes." #[tokio::test] async fn test_no_duplicate_requests() { let server = wiremock::MockServer::start().await; - let count = 10; // given 10 duplicate URLs - let cached = "90.0%"; // we expect 9 out of 10 to be cached + let count = 100; // given 100 duplicate URLs + let cached = "99.0%"; // we expect 99 out of 100 to be cached wiremock::Mock::given(wiremock::matchers::method("GET")) .respond_with(|_: &_| { @@ -1576,6 +1576,8 @@ The config file should contain every possible key for documentation purposes." .write_stdin(format!("{} ", server.uri()).repeat(count)) .arg("-") .arg("--host-stats") + // the request interval must not have an affect on duplicates + .arg("--host-request-interval=1s") .assert() .success() .stdout(contains("100.0% success")) diff --git a/lychee-lib/src/ratelimit/host/host.rs b/lychee-lib/src/ratelimit/host/host.rs index 149648706a..d19025b524 100644 --- a/lychee-lib/src/ratelimit/host/host.rs +++ b/lychee-lib/src/ratelimit/host/host.rs @@ -154,17 +154,17 @@ impl Host { let uri_mutex = self.acquire_uri_mutex(&uri); let _uri_guard = uri_mutex.lock().await; + if let Some(cached) = self.get_cached_status(&uri, needs_body) { + self.record_cache_hit(); + return Ok(cached); + } + self.await_backoff().await; if let Some(rate_limiter) = &self.rate_limiter { rate_limiter.until_ready().await; } - if let Some(cached) = self.get_cached_status(&uri, needs_body) { - self.record_cache_hit(); - return Ok(cached); - } - self.record_cache_miss(); self.perform_request(request, uri, needs_body).await } From 78634dd01e435fd1b940605b507e614f9176757a Mon Sep 17 00:00:00 2001 From: Thomas Zahner Date: Mon, 2 Mar 2026 14:56:17 +0100 Subject: [PATCH 3/4] Check cache even earlier --- lychee-lib/src/ratelimit/host/host.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lychee-lib/src/ratelimit/host/host.rs b/lychee-lib/src/ratelimit/host/host.rs index d19025b524..076d9078f9 100644 --- a/lychee-lib/src/ratelimit/host/host.rs +++ b/lychee-lib/src/ratelimit/host/host.rs @@ -150,7 +150,6 @@ impl Host { url.set_fragment(None); let uri = Uri::from(url); - let _permit = self.acquire_semaphore().await; let uri_mutex = self.acquire_uri_mutex(&uri); let _uri_guard = uri_mutex.lock().await; @@ -159,13 +158,15 @@ impl Host { return Ok(cached); } + self.record_cache_miss(); + let _permit = self.acquire_semaphore().await; + self.await_backoff().await; if let Some(rate_limiter) = &self.rate_limiter { rate_limiter.until_ready().await; } - self.record_cache_miss(); self.perform_request(request, uri, needs_body).await } From 86c7f2ad778a0b5d8b470852ce74b7cf2e52d69f Mon Sep 17 00:00:00 2001 From: Thomas Zahner Date: Mon, 2 Mar 2026 15:10:54 +0100 Subject: [PATCH 4/4] Update lock_uri_mutex Thanks to @katrinafyi for the trick with lock_owned() --- lychee-lib/src/ratelimit/host/host.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/lychee-lib/src/ratelimit/host/host.rs b/lychee-lib/src/ratelimit/host/host.rs index 076d9078f9..a7df954d5c 100644 --- a/lychee-lib/src/ratelimit/host/host.rs +++ b/lychee-lib/src/ratelimit/host/host.rs @@ -149,9 +149,7 @@ impl Host { let mut url = request.url().clone(); url.set_fragment(None); let uri = Uri::from(url); - - let uri_mutex = self.acquire_uri_mutex(&uri); - let _uri_guard = uri_mutex.lock().await; + let _uri_guard = self.lock_uri_mutex(uri.clone()).await; if let Some(cached) = self.get_cached_status(&uri, needs_body) { self.record_cache_hit(); @@ -214,12 +212,16 @@ impl Host { } } - /// Prevent concurrent requests to identical [`Uri`]s. - fn acquire_uri_mutex(&self, uri: &Uri) -> Arc> { - self.active_requests - .entry(uri.clone()) + /// Get a [`tokio::sync::OwnedMutexGuard<()>`] + /// to prevent concurrent requests to identical [`Uri`]s. + async fn lock_uri_mutex(&self, uri: Uri) -> tokio::sync::OwnedMutexGuard<()> { + let uri_mutex = self + .active_requests + .entry(uri) .or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))) - .clone() + .clone(); + + uri_mutex.lock_owned().await } /// Enforce the maximum concurrency of this host