diff --git a/twilight-http-ratelimiting/src/actor.rs b/twilight-http-ratelimiting/src/actor.rs index 5eedd170c2..9a5af4a307 100644 --- a/twilight-http-ratelimiting/src/actor.rs +++ b/twilight-http-ratelimiting/src/actor.rs @@ -76,9 +76,34 @@ struct Queue { impl Queue { /// Whether the queue is exhausted. - const fn is_exhasted(&self) -> bool { + const fn is_exhausted(&self) -> bool { self.remaining == 0 } + + /// Whether the queue is stopped. + const fn is_stopped(&self) -> bool { + !(self.in_flight || (self.is_exhausted() && self.reset.is_some())) + } + + /// Take pending permit requests with the provided endpoint. + fn take(&mut self, endpoint: &Endpoint) -> VecDeque { + let (taken, retained) = mem::take(&mut self.pending) + .into_iter() + .filter(|req| !req.notifier.is_closed()) + .partition(|req| req.endpoint == *endpoint); + self.pending = retained; + + taken + } + + /// Convert the queue into a bucket. + fn to_bucket(&self, f: impl FnOnce(Key) -> Instant) -> Option { + self.reset.map(|key| crate::Bucket { + limit: self.limit, + remaining: self.remaining, + reset_at: f(key).into(), + }) + } } impl From> for Queue { @@ -136,12 +161,9 @@ pub async fn runner( macro_rules! try_pop { ($queue:ident) => { let (mut tx, rx) = oneshot::channel(); - while let Some(req) = $queue - .pending - .front() - .is_some_and(|req| global_remaining != 0 || req.endpoint.is_interaction()) - .then(|| $queue.pending.pop_front()) - .flatten() + while let Some(req) = $queue.pending.front() + && (global_remaining != 0 || req.endpoint.is_interaction()) + && let Some(req) = $queue.pending.pop_front() { match req.notifier.send(tx) { Ok(()) => { @@ -181,10 +203,7 @@ pub async fn runner( } () = &mut global_timer, if global_remaining != global_limit => { global_remaining = global_limit; - // Try resume all stopped queues. - for (_, queue) in queues.iter_mut().filter(|(_, queue)| { - !queue.in_flight && (!queue.is_exhasted() || queue.reset.is_none()) - }) { + for (_, queue) in queues.iter_mut().filter(|(_, queue)| queue.is_stopped()) { try_pop!(queue); } } @@ -195,17 +214,17 @@ pub async fn runner( debug_assert!(!queue.in_flight); queue.reset = None; // Note that non-exhausted queues are not stopped. - if queue.is_exhasted() { + if queue.is_exhausted() { try_pop!(queue); } } Some(Ok((endpoint, headers))) = in_flight.join_next() => { let _span = tracing::info_span!("resp", ?endpoint).entered(); - if let Ok(Some(headers)) = headers { + if let Ok(Some(mut headers)) = headers { tracing::trace!(?headers); let hash = hasher.bucket(&headers.bucket, &endpoint); - let queue = match buckets.entry(endpoint.clone()) { + let queue = match buckets.entry(endpoint) { MapEntry::Occupied(entry) if *entry.get() == headers.bucket => { &mut queues.find_mut(hash, |&(key, _)| key == hash).unwrap().1 } @@ -218,19 +237,16 @@ pub async fn runner( // Retrieve this endpoint's requests. let (_, old_queue) = queues.find_mut(old_hash, |&(key, _)| key == old_hash).unwrap(); + let pending = old_queue.take(old_entry.key()); old_queue.in_flight = false; - let (pending, old_pending) = mem::take(&mut old_queue.pending) - .into_iter() - .partition::, _>(|req| req.endpoint == *old_entry.key()); - old_queue.pending = old_pending; try_pop!(old_queue); match old_entry { MapEntry::Occupied(mut entry) => { - entry.insert(headers.bucket); + entry.insert(mem::take(&mut headers.bucket)); } MapEntry::Vacant(entry) => { - entry.insert(headers.bucket); + entry.insert(mem::take(&mut headers.bucket)); } } // And move them into the new queue. @@ -250,23 +266,19 @@ pub async fn runner( } }; - queue.in_flight = false; queue.limit = headers.limit; queue.remaining = headers.remaining; - if let Some(key) = &queue.reset { - reset.reset_at(key, headers.reset_at.into()); - } else { - queue.reset = Some(reset.insert_at(hash, headers.reset_at.into())); - } - if queue.is_exhasted() { - tracing::info!( - reset_after = ?headers.reset_at.saturating_duration_since(Instant::now().into()), - "exhausted" - ); - continue; + match &queue.reset { + Some(key) => reset.reset_at(key, headers.reset_at.into()), + None => queue.reset = Some(reset.insert_at(hash, headers.reset_at.into())), } - try_pop!(queue); + queue.in_flight = false; + if queue.is_exhausted() { + tracing::info!(reset_after = ?headers.reset_after(), "exhausted"); + } else { + try_pop!(queue); + } } else { if headers.is_err() { tracing::debug!("cancelled"); @@ -284,6 +296,10 @@ pub async fn runner( } } Some((msg, pred)) = rx.recv() => { + if msg.notifier.is_closed() { + continue; + } + if !msg.endpoint.is_valid() { tracing::warn!(path = msg.endpoint.path, "improperly formatted path"); } @@ -301,18 +317,11 @@ pub async fn runner( } }; - let is_cancelled = pred.is_some_and(|p| !p(queue.reset.map(|key| crate::Bucket { - limit: queue.limit, - remaining: queue.remaining, - reset_at: reset.deadline(&key).into(), - }))); - - let queue_active = queue.in_flight || (queue.is_exhasted() && queue.reset.is_some()); - if is_cancelled { + if pred.is_some_and(|p| !p(queue.to_bucket(|key| reset.deadline(&key)))) { drop(msg); - } else if queue_active || (global_remaining == 0 && !msg.endpoint.is_interaction()) { + } else if !queue.is_stopped() || (global_remaining == 0 && !msg.endpoint.is_interaction()) { queue.pending.push_back(msg); - } else if !msg.notifier.is_closed() { + } else { let (tx, rx) = oneshot::channel(); if msg.notifier.send(tx).is_ok() { tracing::debug!(path = msg.endpoint.path, "permitted"); diff --git a/twilight-http-ratelimiting/src/lib.rs b/twilight-http-ratelimiting/src/lib.rs index 3607ed09c1..17d0c493dc 100644 --- a/twilight-http-ratelimiting/src/lib.rs +++ b/twilight-http-ratelimiting/src/lib.rs @@ -191,6 +191,11 @@ impl RateLimitHeaders { reset_at: Instant::now() + Duration::from_secs(retry_after.into()), } } + + /// Duration until the bucket resets. + pub(crate) fn reset_after(&self) -> Duration { + self.reset_at.saturating_duration_since(Instant::now()) + } } /// Permit to send a Discord HTTP API request to the acquired endpoint.