Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,45 @@ jobs:
RUST_BACKTRACE: 1
strategy:
matrix:
build: [linux64, macos, win32, win64]
build: [linux64, macos, win32, win64, wasm32]
include:
- build: linux64
os: ubuntu-latest
channel: stable
toolchain: x86_64-unknown-linux-gnu
target: x86_64-unknown-linux-gnu
#- build: linux32
# os: ubuntu-latest
# channel: stable
# toolchain: i686-unknown-linux-gnu
# target: i686-unknown-linux-gnu
- build: macos
os: macos-latest
channel: stable
toolchain: x86_64-apple-darwin
target: x86_64-apple-darwin
- build: win32
os: windows-latest
channel: stable
toolchain: i686-pc-windows-msvc
target: i686-pc-windows-msvc
- build: win64
os: windows-latest
channel: stable
toolchain: x86_64-pc-windows-msvc
target: x86_64-pc-windows-msvc
- build: wasm32
os: ubuntu-latest
channel: stable
toolchain: x86_64-unknown-linux-gnu
target: wasm32-unknown-unknown
steps:
- uses: actions/checkout@v2
- run: |
TOOLCHAIN=${{ matrix.channel }}-${{ matrix.target }}
TOOLCHAIN=${{ matrix.channel }}-${{ matrix.toolchain }}
rustup toolchain install --no-self-update $TOOLCHAIN
rustup default $TOOLCHAIN
rustup target add ${{ matrix.target }}
shell: bash
- name: Rust version
run: |
Expand All @@ -53,6 +64,7 @@ jobs:
- run: cargo check --target ${{ matrix.target }}
- run: cargo build --target ${{ matrix.target }}
- run: cargo test --target ${{ matrix.target }}
if: ${{ matrix.target != 'wasm32-unknown-unknown' }}
# FIXME(#41): Some timeout/deadline tests make more sense to run in release mode.
#- run: cargo test --release --target ${{ matrix.target }}
- run: cargo build --all-targets --target ${{ matrix.target }}
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- `WeakSender` is now `Clone`
- `spin` feature no longer uses `std::thread::sleep` for locking except on Unix-like operating systems and Windows

### Fixed

Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async-std = { version = "1.9.0", features = ["attributes", "unstable"] }
futures = { version = "^0.3", features = ["std"] }
waker-fn = "1.1.0"
tokio = { version = "^1.16.1", features = ["rt", "macros"] }
getrandom = { version = "0.2.15", features = ["js"] }

[[bench]]
name = "basic"
Expand Down
4 changes: 2 additions & 2 deletions examples/async.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::main]
async fn main() {
let (tx, rx) = flume::bounded(1);
Expand All @@ -17,5 +17,5 @@ async fn main() {
t.await;
}

#[cfg(not(feature = "async"))]
#[cfg(any(not(feature = "async"), target_os = "unknown"))]
fn main() {}
26 changes: 17 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,18 +395,26 @@ impl<T> Hook<T, SyncSignal> {
#[cfg(feature = "spin")]
#[inline]
fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> {
let mut i = 4;
loop {
for _ in 0..10 {
if let Some(guard) = lock.try_lock() {
return guard;
// Some targets don't support `thread::sleep` (e.g. the `wasm32-unknown-unknown` target when
// running in the main thread of a web browser) so we only use it on targets where we know it
// will work
#[cfg(any(target_family = "unix", target_family = "windows"))]
{
let mut i = 4;
loop {
for _ in 0..10 {
if let Some(guard) = lock.try_lock() {
return guard;
}
thread::yield_now();
}
thread::yield_now();
// Sleep for at most ~1 ms
thread::sleep(Duration::from_nanos(1 << i.min(20)));
i += 1;
}
// Sleep for at most ~1 ms
thread::sleep(Duration::from_nanos(1 << i.min(20)));
i += 1;
}
#[cfg(not(any(target_family = "unix", target_family = "windows")))]
lock.lock()
}

#[cfg(not(feature = "spin"))]
Expand Down
20 changes: 10 additions & 10 deletions tests/async.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
use {
flume::*,
futures::{stream::FuturesUnordered, StreamExt, TryFutureExt, Future},
Expand All @@ -7,7 +7,7 @@ use {
std::{time::Duration, sync::{atomic::{AtomicUsize, Ordering}, Arc}},
};

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_recv() {
let (tx, rx) = unbounded();
Expand All @@ -24,7 +24,7 @@ fn r#async_recv() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_send() {
let (tx, rx) = bounded(1);
Expand All @@ -41,7 +41,7 @@ fn r#async_send() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_recv_disconnect() {
let (tx, rx) = bounded::<i32>(0);
Expand All @@ -58,7 +58,7 @@ fn r#async_recv_disconnect() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_send_disconnect() {
let (tx, rx) = bounded(0);
Expand All @@ -75,7 +75,7 @@ fn r#async_send_disconnect() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#async_recv_drop_recv() {
let (tx, rx) = bounded::<i32>(10);
Expand Down Expand Up @@ -103,7 +103,7 @@ fn r#async_recv_drop_recv() {
assert_eq!(t.join().unwrap(), Ok(42))
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn r#async_send_1_million_no_drop_or_reorder() {
#[derive(Debug)]
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn r#async_send_1_million_no_drop_or_reorder() {
assert_eq!(count, 1_000_000)
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn parallel_async_receivers() {
let (tx, rx) = flume::unbounded();
Expand Down Expand Up @@ -175,7 +175,7 @@ async fn parallel_async_receivers() {
println!("recv end");
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn change_waker() {
let (tx, rx) = flume::bounded(1);
Expand Down Expand Up @@ -246,7 +246,7 @@ fn change_waker() {
}
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn spsc_single_threaded_value_ordering() {
async fn test() {
Expand Down
18 changes: 9 additions & 9 deletions tests/stream.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
use {
flume::*,
futures::{stream::FuturesUnordered, StreamExt, TryFutureExt},
Expand All @@ -7,7 +7,7 @@ use {
};
use futures::{stream, Stream};

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn stream_recv() {
let (tx, rx) = unbounded();
Expand All @@ -28,7 +28,7 @@ fn stream_recv() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn stream_recv_disconnect() {
let (tx, rx) = bounded::<i32>(0);
Expand All @@ -48,7 +48,7 @@ fn stream_recv_disconnect() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn stream_recv_drop_recv() {
let (tx, rx) = bounded::<i32>(10);
Expand Down Expand Up @@ -80,7 +80,7 @@ fn stream_recv_drop_recv() {
assert_eq!(t.join().unwrap(), Some(42))
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn r#stream_drop_send_disconnect() {
let (tx, rx) = bounded::<i32>(1);
Expand All @@ -98,7 +98,7 @@ fn r#stream_drop_send_disconnect() {
t.join().unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn stream_send_1_million_no_drop_or_reorder() {
#[derive(Debug)]
Expand Down Expand Up @@ -133,7 +133,7 @@ async fn stream_send_1_million_no_drop_or_reorder() {
assert_eq!(count, 1_000_000)
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn parallel_streams_and_async_recv() {
let (tx, rx) = flume::unbounded();
Expand Down Expand Up @@ -174,7 +174,7 @@ async fn parallel_streams_and_async_recv() {
.unwrap();
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[test]
fn stream_no_double_wake() {
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -220,7 +220,7 @@ fn stream_no_double_wake() {
assert_eq!(count.load(Ordering::SeqCst), 1);
}

#[cfg(feature = "async")]
#[cfg(all(feature = "async", not(target_os = "unknown")))]
#[async_std::test]
async fn stream_forward_issue_55() { // https://github.com/zesterer/flume/issues/55
fn dummy_stream() -> impl Stream<Item = usize> {
Expand Down