-
Notifications
You must be signed in to change notification settings - Fork 1.2k
refactor(request-response): don't use upgrade infrastructure #3914
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 17 commits
Commits
Show all changes
34 commits
Select commit
Hold shift + click to select a range
1db7fc3
Never close connections in request-response
thomaseizinger 10508d6
Log error instead
thomaseizinger f4f1e58
Refactor `request-response` to not use upgrade mechanism
thomaseizinger 4afc9d3
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger 30f53b3
Don't panic on timeouts
thomaseizinger 50e6207
Remove unnecessary comment
thomaseizinger aea1027
Use `mpsc` channel for sending inbound requests
thomaseizinger 6e5f76d
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger d41f44b
Fix keep-alive TODO
thomaseizinger 796780e
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger 791220c
Fix MSRV issue
thomaseizinger eaf1d97
Introduce `futures-bounded` for time and space bounded workers
thomaseizinger be64f0d
Add changelog
thomaseizinger 9a9fd79
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger bdd95f7
Add description
thomaseizinger 51ef639
Add test for backpressure
thomaseizinger a250457
Allow configuration of max capacity for worker streams
thomaseizinger 7127463
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger 0723c2f
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger 701d288
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger f8c42de
Fix compile errors
thomaseizinger 7960117
Update swarm/CHANGELOG.md
thomaseizinger 671b90f
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger 3810cac
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger 4b24287
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger 302c920
Move changelog entry
thomaseizinger dfa3a58
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger 53b9abd
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger d812e1c
Merge branch 'refactor/req-res-on-upgrade' of github.com:libp2p/rust-…
thomaseizinger 71354b2
Report IO failures
thomaseizinger feccbc2
fix(request-response): Report failures (#4701)
oblique beb8863
Merge branch 'master' into refactor/req-res-on-upgrade
thomaseizinger c6603a1
Add further changelog entry
thomaseizinger 1599f8f
Merge branch 'master' into refactor/req-res-on-upgrade
mergify[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| ## 0.1.0 - unreleased | ||
|
|
||
| Initial release. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| [package] | ||
| name = "futures-bounded" | ||
| version = "0.1.0" | ||
| edition = "2021" | ||
| rust-version.workspace = true | ||
| license = "MIT" | ||
| repository = "https://github.com/libp2p/rust-libp2p" | ||
| keywords = ["futures", "async", "backpressure"] | ||
| categories = ["data-structures", "asynchronous"] | ||
| description = "Utilities for bounding futures in size and time." | ||
|
|
||
| # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | ||
|
|
||
| [dependencies] | ||
| futures-util = { version = "0.3.28" } | ||
| futures-timer = "3.0.2" | ||
|
|
||
| [dev-dependencies] | ||
| tokio = { version = "1.29.1", features = ["macros", "rt"] } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,193 @@ | ||
| use std::future::Future; | ||
| use std::task::{Context, Poll, Waker}; | ||
| use std::time::Duration; | ||
|
|
||
| use futures_timer::Delay; | ||
| use futures_util::future::{select, BoxFuture, Either}; | ||
| use futures_util::stream::FuturesUnordered; | ||
| use futures_util::{ready, FutureExt, StreamExt}; | ||
|
|
||
| /// Represents a set of (Worker)-[Future]s. | ||
| /// | ||
| /// This wraps [FuturesUnordered] but bounds it by time and size. | ||
thomaseizinger marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| /// In other words, each worker must finish within the specified time and the set never outgrows its capacity. | ||
| pub struct WorkerFutures<K, O> { | ||
| timeout: Duration, | ||
| capacity: usize, | ||
| inner: FuturesUnordered<BoxFuture<'static, (K, Result<O, Timeout>)>>, | ||
|
|
||
| empty_waker: Option<Waker>, | ||
| full_waker: Option<Waker>, | ||
| } | ||
|
|
||
| impl<K, O> WorkerFutures<K, O> { | ||
| pub fn new(timeout: Duration, capacity: usize) -> Self { | ||
| Self { | ||
| timeout, | ||
| capacity, | ||
| inner: Default::default(), | ||
| empty_waker: None, | ||
| full_waker: None, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl<K, O> WorkerFutures<K, O> | ||
| where | ||
| K: Send + 'static, | ||
| { | ||
| pub fn try_push<F>(&mut self, key: K, worker: F) -> Option<F> | ||
thomaseizinger marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| where | ||
| F: Future<Output = O> + Send + 'static + Unpin, | ||
| { | ||
| if self.inner.len() >= self.capacity { | ||
| return Some(worker); | ||
| } | ||
| let timeout = Delay::new(self.timeout); | ||
|
|
||
| self.inner.push( | ||
| async move { | ||
| match select(worker, timeout).await { | ||
| Either::Left((out, _)) => (key, Ok(out)), | ||
| Either::Right(((), _)) => (key, Err(Timeout::new())), | ||
| } | ||
| } | ||
| .boxed(), | ||
| ); | ||
|
|
||
| if let Some(waker) = self.empty_waker.take() { | ||
| waker.wake(); | ||
| } | ||
|
|
||
| None | ||
| } | ||
|
|
||
| pub fn is_empty(&self) -> bool { | ||
| self.inner.is_empty() | ||
| } | ||
|
|
||
| pub fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<()> { | ||
| if self.inner.len() < self.capacity { | ||
| return Poll::Ready(()); | ||
| } | ||
|
|
||
| self.full_waker = Some(cx.waker().clone()); | ||
| Poll::Pending | ||
| } | ||
|
|
||
| pub fn poll_unpin(&mut self, cx: &mut Context<'_>) -> Poll<(K, Result<O, Timeout>)> { | ||
| match ready!(self.inner.poll_next_unpin(cx)) { | ||
| None => { | ||
| self.empty_waker = Some(cx.waker().clone()); | ||
| Poll::Pending | ||
| } | ||
| Some(result) => { | ||
| if let Some(waker) = self.full_waker.take() { | ||
| waker.wake(); | ||
| } | ||
|
|
||
| Poll::Ready(result) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| pub struct Timeout { | ||
| _priv: (), | ||
| } | ||
|
|
||
| impl Timeout { | ||
| fn new() -> Self { | ||
| Self { _priv: () } | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use std::future::{pending, poll_fn, ready}; | ||
| use std::pin::Pin; | ||
| use std::time::Instant; | ||
|
|
||
| #[test] | ||
| fn cannot_push_more_than_capacity_tasks() { | ||
| let mut workers = WorkerFutures::new(Duration::from_secs(10), 1); | ||
|
|
||
| assert!(workers.try_push((), ready(())).is_none()); | ||
| assert!(workers.try_push((), ready(())).is_some()); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn workers_timeout() { | ||
| let mut workers = WorkerFutures::new(Duration::from_millis(100), 1); | ||
|
|
||
| let _ = workers.try_push((), pending::<()>()); | ||
| Delay::new(Duration::from_millis(150)).await; | ||
| let (_, result) = poll_fn(|cx| workers.poll_unpin(cx)).await; | ||
|
|
||
| assert!(result.is_err()) | ||
| } | ||
|
|
||
| // Each worker causes a delay, `Task` only has a capacity of 1, meaning they must be processed in sequence. | ||
| // We stop after NUM_WORKERS tasks, meaning the overall execution must at least take DELAY * NUM_WORKERS. | ||
| #[tokio::test] | ||
| async fn backpressure() { | ||
| const DELAY: Duration = Duration::from_millis(100); | ||
| const NUM_WORKERS: u32 = 10; | ||
|
|
||
| let start = Instant::now(); | ||
| Task::new(DELAY, NUM_WORKERS, 1).await; | ||
| let duration = start.elapsed(); | ||
|
|
||
| assert!(duration >= DELAY * NUM_WORKERS); | ||
| } | ||
|
|
||
| struct Task { | ||
| worker: Duration, | ||
| num_workers: usize, | ||
| num_processed: usize, | ||
| inner: WorkerFutures<(), ()>, | ||
| } | ||
|
|
||
| impl Task { | ||
| fn new(worker: Duration, num_workers: u32, capacity: usize) -> Self { | ||
| Self { | ||
| worker, | ||
| num_workers: num_workers as usize, | ||
| num_processed: 0, | ||
| inner: WorkerFutures::new(Duration::from_secs(60), capacity), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Future for Task { | ||
| type Output = (); | ||
|
|
||
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
| let this = self.get_mut(); | ||
|
|
||
| while this.num_processed < this.num_workers { | ||
| if let Poll::Ready(((), result)) = this.inner.poll_unpin(cx) { | ||
| if result.is_err() { | ||
| panic!("Timeout is great than worker delay") | ||
| } | ||
|
|
||
| this.num_processed += 1; | ||
| continue; | ||
| } | ||
|
|
||
| if let Poll::Ready(()) = this.inner.poll_ready_unpin(cx) { | ||
| let maybe_worker = this.inner.try_push((), Delay::new(this.worker)); | ||
| assert!(maybe_worker.is_none(), "we polled for readiness"); | ||
|
|
||
| continue; | ||
| } | ||
|
|
||
| return Poll::Pending; | ||
| } | ||
|
|
||
| Poll::Ready(()) | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.