Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 52 additions & 43 deletions twilight-http-ratelimiting/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,34 @@ struct Queue {

impl Queue {
/// Whether the queue is exhausted.
const fn is_exhasted(&self) -> bool {
const fn is_exhausted(&self) -> bool {
self.remaining == 0
}

/// Whether the queue is stopped.
const fn is_stopped(&self) -> bool {
!(self.in_flight || (self.is_exhausted() && self.reset.is_some()))
}

/// Take pending permit requests with the provided endpoint.
fn take(&mut self, endpoint: &Endpoint) -> VecDeque<Message> {
let (taken, retained) = mem::take(&mut self.pending)
.into_iter()
.filter(|req| !req.notifier.is_closed())
.partition(|req| req.endpoint == *endpoint);
self.pending = retained;

taken
}

/// Convert the queue into a bucket.
fn to_bucket(&self, f: impl FnOnce(Key) -> Instant) -> Option<crate::Bucket> {
self.reset.map(|key| crate::Bucket {
limit: self.limit,
remaining: self.remaining,
reset_at: f(key).into(),
})
}
}

impl From<VecDeque<Message>> for Queue {
Expand Down Expand Up @@ -136,12 +161,9 @@ pub async fn runner(
macro_rules! try_pop {
($queue:ident) => {
let (mut tx, rx) = oneshot::channel();
while let Some(req) = $queue
.pending
.front()
.is_some_and(|req| global_remaining != 0 || req.endpoint.is_interaction())
.then(|| $queue.pending.pop_front())
.flatten()
while let Some(req) = $queue.pending.front()
&& (global_remaining != 0 || req.endpoint.is_interaction())
&& let Some(req) = $queue.pending.pop_front()
{
match req.notifier.send(tx) {
Ok(()) => {
Expand Down Expand Up @@ -181,10 +203,7 @@ pub async fn runner(
}
() = &mut global_timer, if global_remaining != global_limit => {
global_remaining = global_limit;
// Try resume all stopped queues.
for (_, queue) in queues.iter_mut().filter(|(_, queue)| {
!queue.in_flight && (!queue.is_exhasted() || queue.reset.is_none())
}) {
for (_, queue) in queues.iter_mut().filter(|(_, queue)| queue.is_stopped()) {
try_pop!(queue);
}
}
Expand All @@ -195,17 +214,17 @@ pub async fn runner(
debug_assert!(!queue.in_flight);
queue.reset = None;
// Note that non-exhausted queues are not stopped.
if queue.is_exhasted() {
if queue.is_exhausted() {
try_pop!(queue);
}
}
Some(Ok((endpoint, headers))) = in_flight.join_next() => {
let _span = tracing::info_span!("resp", ?endpoint).entered();
if let Ok(Some(headers)) = headers {
if let Ok(Some(mut headers)) = headers {
tracing::trace!(?headers);

let hash = hasher.bucket(&headers.bucket, &endpoint);
let queue = match buckets.entry(endpoint.clone()) {
let queue = match buckets.entry(endpoint) {
MapEntry::Occupied(entry) if *entry.get() == headers.bucket => {
&mut queues.find_mut(hash, |&(key, _)| key == hash).unwrap().1
}
Expand All @@ -218,19 +237,16 @@ pub async fn runner(

// Retrieve this endpoint's requests.
let (_, old_queue) = queues.find_mut(old_hash, |&(key, _)| key == old_hash).unwrap();
let pending = old_queue.take(old_entry.key());
old_queue.in_flight = false;
let (pending, old_pending) = mem::take(&mut old_queue.pending)
.into_iter()
.partition::<VecDeque<_>, _>(|req| req.endpoint == *old_entry.key());
old_queue.pending = old_pending;
try_pop!(old_queue);

match old_entry {
MapEntry::Occupied(mut entry) => {
entry.insert(headers.bucket);
entry.insert(mem::take(&mut headers.bucket));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot call reset_after() if headers is partially moved.

}
MapEntry::Vacant(entry) => {
entry.insert(headers.bucket);
entry.insert(mem::take(&mut headers.bucket));
}
}
// And move them into the new queue.
Expand All @@ -250,23 +266,19 @@ pub async fn runner(
}
};

queue.in_flight = false;
queue.limit = headers.limit;
queue.remaining = headers.remaining;
if let Some(key) = &queue.reset {
reset.reset_at(key, headers.reset_at.into());
} else {
queue.reset = Some(reset.insert_at(hash, headers.reset_at.into()));
}
if queue.is_exhasted() {
tracing::info!(
reset_after = ?headers.reset_at.saturating_duration_since(Instant::now().into()),
"exhausted"
);
continue;
match &queue.reset {
Some(key) => reset.reset_at(key, headers.reset_at.into()),
None => queue.reset = Some(reset.insert_at(hash, headers.reset_at.into())),
}

try_pop!(queue);
queue.in_flight = false;
if queue.is_exhausted() {
tracing::info!(reset_after = ?headers.reset_after(), "exhausted");
} else {
try_pop!(queue);
}
} else {
if headers.is_err() {
tracing::debug!("cancelled");
Expand All @@ -284,6 +296,10 @@ pub async fn runner(
}
}
Some((msg, pred)) = rx.recv() => {
if msg.notifier.is_closed() {
continue;
}

if !msg.endpoint.is_valid() {
tracing::warn!(path = msg.endpoint.path, "improperly formatted path");
}
Expand All @@ -301,18 +317,11 @@ pub async fn runner(
}
};

let is_cancelled = pred.is_some_and(|p| !p(queue.reset.map(|key| crate::Bucket {
limit: queue.limit,
remaining: queue.remaining,
reset_at: reset.deadline(&key).into(),
})));

let queue_active = queue.in_flight || (queue.is_exhasted() && queue.reset.is_some());
if is_cancelled {
if pred.is_some_and(|p| !p(queue.to_bucket(|key| reset.deadline(&key)))) {
drop(msg);
} else if queue_active || (global_remaining == 0 && !msg.endpoint.is_interaction()) {
} else if !queue.is_stopped() || (global_remaining == 0 && !msg.endpoint.is_interaction()) {
queue.pending.push_back(msg);
} else if !msg.notifier.is_closed() {
} else {
let (tx, rx) = oneshot::channel();
if msg.notifier.send(tx).is_ok() {
tracing::debug!(path = msg.endpoint.path, "permitted");
Expand Down
5 changes: 5 additions & 0 deletions twilight-http-ratelimiting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ impl RateLimitHeaders {
reset_at: Instant::now() + Duration::from_secs(retry_after.into()),
}
}

/// Duration until the bucket resets.
pub(crate) fn reset_after(&self) -> Duration {
self.reset_at.saturating_duration_since(Instant::now())
}
}

/// Permit to send a Discord HTTP API request to the acquired endpoint.
Expand Down