Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
rustup show
shell: bash
# Check build to fail fast
- run: cargo check --target ${{ matrix.target }}
- run: cargo check --tests --target ${{ matrix.target }}
- run: cargo build --target ${{ matrix.target }}
- run: cargo test --target ${{ matrix.target }}
if: ${{ matrix.target != 'wasm32-unknown-unknown' }}
Expand Down
8 changes: 4 additions & 4 deletions src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl<'a, T> Deref for OwnedOrRef<'a, T> {

fn deref(&self) -> &T {
match self {
OwnedOrRef::Owned(arc) => &arc,
OwnedOrRef::Owned(arc) => arc,
OwnedOrRef::Ref(r) => r,
}
}
Expand All @@ -90,9 +90,9 @@ impl<T> Sender<T> {
///
/// In the current implementation, the returned future will not yield to the async runtime if the
/// channel is unbounded. This may change in later versions.
pub fn send_async(&self, item: T) -> SendFut<T> {
pub fn send_async(&self, item: T) -> SendFut<'_, T> {
SendFut {
sender: OwnedOrRef::Ref(&self),
sender: OwnedOrRef::Ref(self),
hook: Some(SendState::NotYetSent(item)),
}
}
Expand All @@ -117,7 +117,7 @@ impl<T> Sender<T> {
/// channel is unbounded. This may change in later versions.
pub fn sink(&self) -> SendSink<'_, T> {
SendSink(SendFut {
sender: OwnedOrRef::Ref(&self),
sender: OwnedOrRef::Ref(self),
hook: None,
})
}
Expand Down
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl<T> Hook<T, SyncSignal> {

#[cfg(feature = "spin")]
#[inline]
fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<'_, T> {
// Some targets don't support `thread::sleep` (e.g. the `wasm32-unknown-unknown` target when
// running in the main thread of a web browser) so we only use it on targets where we know it
// will work
Expand Down Expand Up @@ -975,20 +975,20 @@ impl<T> Receiver<T> {
/// when all senders have been dropped.
///
/// You can also create a self-owned iterator with [`Receiver::into_iter`].
pub fn iter(&self) -> Iter<T> {
Iter { receiver: &self }
pub fn iter(&self) -> Iter<'_, T> {
Iter { receiver: self }
}

/// A non-blocking iterator over the values received on the channel that finishes iteration
/// when all senders have been dropped or the channel is empty.
pub fn try_iter(&self) -> TryIter<T> {
TryIter { receiver: &self }
pub fn try_iter(&self) -> TryIter<'_, T> {
TryIter { receiver: self }
}

/// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike
/// `try_iter`, the iterator will not attempt to fetch any more values from the channel once
/// the function has been called.
pub fn drain(&self) -> Drain<T> {
pub fn drain(&self) -> Drain<'_, T> {
let mut chan = wait_lock(&self.shared.chan);
chan.pull_pending(false);
let queue = std::mem::take(&mut chan.queue);
Expand Down
8 changes: 4 additions & 4 deletions src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl<'a, T> Selector<'a, T> {
selections: Vec::new(),
next_poll: 0,
signalled: Arc::default(),
phantom: PhantomData::default(),
phantom: PhantomData,
#[cfg(feature = "eventual-fairness")]
rng: fastrand::Rng::new(),
}
Expand Down Expand Up @@ -180,7 +180,7 @@ impl<'a, T> Selector<'a, T> {
return None;
};

Some((&mut self.mapper)(res))
Some((self.mapper)(res))
}

fn deinit(&mut self) {
Expand Down Expand Up @@ -274,7 +274,7 @@ impl<'a, T> Selector<'a, T> {
return None;
};

Some((&mut self.mapper)(res))
Some((self.mapper)(res))
}

fn deinit(&mut self) {
Expand Down Expand Up @@ -320,7 +320,7 @@ impl<'a, T> Selector<'a, T> {
self.next_poll = self.rng.usize(0..self.selections.len());
}

let res = 'outer: loop {
let res = 'outer: {
// Init signals
for _ in 0..self.selections.len() {
if let Some(val) = self.selections[self.next_poll].init() {
Expand Down
34 changes: 17 additions & 17 deletions tests/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::Duration;

use crossbeam_utils::thread::scope;
use flume::{bounded, Receiver};
use flume::{RecvError, RecvTimeoutError, TryRecvError};
use flume::{RecvTimeoutError, TryRecvError};
use flume::{SendError, SendTimeoutError, TrySendError};
use rand::{thread_rng, Rng};

Expand Down Expand Up @@ -46,38 +46,38 @@ fn len_empty_full() {
let (s, r) = bounded(2);

assert_eq!(s.len(), 0);
assert_eq!(s.is_empty(), true);
assert_eq!(s.is_full(), false);
assert!(s.is_empty());
assert!(!s.is_full());
assert_eq!(r.len(), 0);
assert_eq!(r.is_empty(), true);
assert_eq!(r.is_full(), false);
assert!(r.is_empty());
assert!(!r.is_full());

s.send(()).unwrap();

assert_eq!(s.len(), 1);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), false);
assert!(!s.is_empty());
assert!(!s.is_full());
assert_eq!(r.len(), 1);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), false);
assert!(!r.is_empty());
assert!(!r.is_full());

s.send(()).unwrap();

assert_eq!(s.len(), 2);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), true);
assert!(!s.is_empty());
assert!(s.is_full());
assert_eq!(r.len(), 2);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), true);
assert!(!r.is_empty());
assert!(r.is_full());

r.recv().unwrap();

assert_eq!(s.len(), 1);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), false);
assert!(!s.is_empty());
assert!(!s.is_full());
assert_eq!(r.len(), 1);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), false);
assert!(!r.is_empty());
assert!(!r.is_full());
}

#[test]
Expand Down
2 changes: 1 addition & 1 deletion tests/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ fn change_waker() {
self.0.load(Ordering::SeqCst)
}

fn ctx(&self) -> Context {
fn ctx(&self) -> Context<'_> {
Context::from_waker(&self.1)
}
}
Expand Down
10 changes: 5 additions & 5 deletions tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ fn hydra() {
for _ in 0..10 {
for tx in &txs {
for _ in 0..msg_num {
tx.send(Default::default()).unwrap();
tx.send(()).unwrap();
}
}

Expand Down Expand Up @@ -314,7 +314,7 @@ fn robin() {
let main_tx = main_tx.clone();
std::thread::spawn(move || {
for _ in 0..msg_num {
main_tx.send(Default::default()).unwrap();
main_tx.send(()).unwrap();
}
});

Expand Down Expand Up @@ -382,7 +382,7 @@ fn std_error_without_debug() {
}

match rx.recv() {
Ok(_) => {}
Ok(MessageWithoutDebug(n)) => assert_eq!(n, 1),
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
Expand All @@ -396,7 +396,7 @@ fn std_error_without_debug() {
}

match rx.try_recv() {
Ok(_) => {}
Ok(MessageWithoutDebug(n)) => assert_eq!(n, 2),
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
Expand All @@ -410,7 +410,7 @@ fn std_error_without_debug() {
}

match rx.recv_timeout(Duration::from_secs(10000000)) {
Ok(_) => {}
Ok(MessageWithoutDebug(n)) => assert_eq!(n, 3),
Err(e) => {
let _std_err: &dyn std::error::Error = &e;
}
Expand Down
4 changes: 2 additions & 2 deletions tests/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn recv_into_iter_owned() {

assert_eq!(iter.next().unwrap(), 1);
assert_eq!(iter.next().unwrap(), 2);
assert_eq!(iter.next().is_none(), true);
assert!(iter.next().is_none());
}

#[test]
Expand All @@ -108,5 +108,5 @@ fn recv_into_iter_borrowed() {
let mut iter = (&r).into_iter();
assert_eq!(iter.next().unwrap(), 1);
assert_eq!(iter.next().unwrap(), 2);
assert_eq!(iter.next().is_none(), true);
assert!(iter.next().is_none());
}
26 changes: 13 additions & 13 deletions tests/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::time::Duration;

use crossbeam_utils::thread::scope;
use flume::{unbounded, Receiver};
use flume::{RecvError, RecvTimeoutError, TryRecvError};
use flume::{RecvTimeoutError, TryRecvError};
use flume::{SendError, SendTimeoutError, TrySendError};
use rand::{thread_rng, Rng};

Expand Down Expand Up @@ -44,29 +44,29 @@ fn len_empty_full() {
let (s, r) = unbounded();

assert_eq!(s.len(), 0);
assert_eq!(s.is_empty(), true);
assert_eq!(s.is_full(), false);
assert!(s.is_empty());
assert!(!s.is_full());
assert_eq!(r.len(), 0);
assert_eq!(r.is_empty(), true);
assert_eq!(r.is_full(), false);
assert!(r.is_empty());
assert!(!r.is_full());

s.send(()).unwrap();

assert_eq!(s.len(), 1);
assert_eq!(s.is_empty(), false);
assert_eq!(s.is_full(), false);
assert!(!s.is_empty());
assert!(!s.is_full());
assert_eq!(r.len(), 1);
assert_eq!(r.is_empty(), false);
assert_eq!(r.is_full(), false);
assert!(!r.is_empty());
assert!(!r.is_full());

r.recv().unwrap();

assert_eq!(s.len(), 0);
assert_eq!(s.is_empty(), true);
assert_eq!(s.is_full(), false);
assert!(s.is_empty());
assert!(!s.is_full());
assert_eq!(r.len(), 0);
assert_eq!(r.is_empty(), true);
assert_eq!(r.is_full(), false);
assert!(r.is_empty());
assert!(!r.is_full());
}

#[test]
Expand Down
Loading